Kafka高級特性解析(二)

Kafka高級特性解析(二)

主題

管理

使用kafka-topics.sh腳本:

選項 說明
--config <String: name=value> 為創(chuàng)建的或修改的主題指定配置信息辣往。支持下述 配置條目:<br />cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
--create 創(chuàng)建一個新主題
--delete 刪除一個主題
--delete-config <String: name> 刪除現(xiàn)有主題的一個主題配置條目权埠。這些條目就 是在 --config 中給出的配置條目。
--alter 更改主題的分區(qū)數(shù)量袍暴,副本分配和/或配置條目众眨。
--describe 列出給定主題的細節(jié)。
--disable-rack-aware 禁用副本分配的機架感知容诬。
--force 抑制控制臺提示信息
--help 打印幫助信息
--if-exists 如果指定了該選項,則在修改或刪除主題的時 候沿腰,只有主題存在才可以執(zhí)行览徒。
--if-not-exists 在創(chuàng)建主題的時候,如果指定了該選項颂龙,則只有 主題不存在的時候才可以執(zhí)行命令习蓬。
--list 列出所有可用的主題。
--partitions <Integer: # of partitions> 要創(chuàng)建或修改主題的分區(qū)數(shù)措嵌。
--replica-assignment <String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2 ,broker_id_for_part2_replica1 :broker_id_for_part2_replica2 , ...> 當(dāng)創(chuàng)建或修改主題的時候手動指定partition-to- broker的分配關(guān)系躲叼。
--replication-factor <Integer:replication factor> 要創(chuàng)建的主題分區(qū)副本數(shù)。1表示只有一個副本企巢, 也就是Leader副本枫慷。
--topic <String: topic> 要創(chuàng)建、修改或描述的主題名稱浪规。除了創(chuàng)建或听,修 改和描述在這里還可以使用正則表達式。
--topics-with-overrides if set when describing topics, only show topics that have overridden configs
--unavailable-partitions if set when describing topics, only show partitions whose leader is not available
--under-replicated-partitions if set when describing topics, only show under replicated partitions
--zookeeper <String: urls> 必需的參數(shù):連接zookeeper的字符串笋婿,逗號分 隔的多個host:port列表酷宵。多個URL可以故障轉(zhuǎn) 移递礼。

主題中可以使用的參數(shù)定義:

屬性 默認值 服務(wù)器默認屬性 說明
cleanup.policy delete log.cleanup.policy 要么是”delete“,要么是” compact“; 這個字符串指明了針對舊日志部分的利用方式;默 認方式("delete")將會丟棄舊的部分當(dāng)他們的回收時間或者尺寸限制到達時∩醺伲”compact“將會 進行日志壓
compression.type none producer用于壓縮數(shù)據(jù)的壓縮 類型。默認是無壓縮售睹。正確的選 項值是none尊残、gzip、snappy讨永。 壓縮最好用于批量處理滔驶,批量處 理消息越多,壓縮性能越好卿闹。
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms 對于壓縮日志保留的最長時間揭糕, 也是客戶端消費消息的最長時間萝快,通log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù), 一個控制壓縮后的數(shù)據(jù)著角。此項配 置可以在topic創(chuàng)建時的置頂參 數(shù)覆蓋
flush.ms None log.flush.interval.ms 此項配置用來置頂強制進行 fsync日志到磁盤的時間間隔; 例如揪漩,如果設(shè)置為1000,那么 每1000ms就需要進行一次 fsync吏口。一般不建議使用這個選 項
flush.messages None log.flush.interval.messages 此項配置指定時間間隔:強制進 行fsync日志奄容。例如,如果這個 選項設(shè)置為1产徊,那么每條消息之 后都需要進行fsync昂勒,如果設(shè)置 為5,則每5條消息就需要進行一 次fsync舟铜。一般來說戈盈,建議你不 要設(shè)置這個值。此參數(shù)的設(shè)置, 需要在"數(shù)據(jù)可靠性"與"性能"之 間做必要的權(quán)衡.如果此值過大, 將會導(dǎo)致每次"fsync"的時間較長 (IO阻塞),如果此值過小,將會導(dǎo) 致"fsync"的次數(shù)較多,這也意味 著整體的client請求有一定的延 遲.物理server故障,將會導(dǎo)致沒 有fsync的消息丟失.
index.interval.bytes 4096 log.index.interval.bytes 默認設(shè)置保證了我們每4096個 字節(jié)就對消息添加一個索引谆刨,更 多的索引使得閱讀的消息更加靠 近塘娶,但是索引規(guī)模卻會由此增 大;一般不需要改變這個選項
max.message.bytes 1000000 max.message.bytes kafka追加消息的最大尺寸。注 意如果你增大這個尺寸痊夭,你也必 須增大你consumer的fetch 尺 寸刁岸,這樣consumer才能fetch到 這些最大尺寸的消息。
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio 此項配置控制log壓縮器試圖進 行清除日志的頻率她我。默認情況 下虹曙,將避免清除壓縮率超過50% 的日志。這個比率避免了最大的 空間浪費
min.insync.replicas 1 min.insync.replicas 當(dāng)producer設(shè)置 request.required.acks為-1時鸦难, min.insync.replicas指定 replicas的最小數(shù)目(必須確認 每一個repica的寫數(shù)據(jù)都是成功 的)根吁,如果這個數(shù)目沒有達到, producer會產(chǎn)生異常合蔽。
retention.bytes None log.retention.bytes 如果使用“delete”的retention 策 略击敌,這項配置就是指在刪除日志 之前,日志所能達到的最大尺 寸拴事。默認情況下沃斤,沒有尺寸限制 而只有時間限制
retention.ms 7 days log.retention.minutes 如果使用“delete”的retention策 略,這項配置就是指刪除日志前 日志保存的時間刃宵。
segment.bytes 1GB log.segment.bytes kafka中l(wèi)og日志是分成一塊塊存 儲的衡瓶,此配置是指log日志劃分 成塊的大小
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有關(guān)offsets和文件位置 之間映射的索引文件的大小;一 般不需要修改這個配置
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
segment.ms 7 days log.roll.hours 即使log的分塊文件沒有達到需 要刪除、壓縮的大小牲证,一旦log 的時間達到這個上限哮针,就會強制 新建一個log分塊文件
unclean.leader.election.enable true 指明了是否能夠使不在ISR中 replicas設(shè)置用來作為leader
創(chuàng)建主題
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x -partitions 1 --replication-factor 1

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_y --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
查看主題
## 查看所有主題名稱
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
## 查看主題詳細信息
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_y
## 查看主題配置被修改的主題
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe
修改主題
## 給主題修改配置
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_x --config max.message.bytes=1048576
## 給主題修改配置
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_x --config segment.bytes=10485760
## 刪除該主題的某個配置
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config segment.bytes --topic topic_x
刪除主題
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

可以在/mnt/module/kafka_2.12-1.0.2/kafka-logs看到,給主題添加刪除的標記,要過一段時間刪除十厢。

增加分區(qū)

通過命令行工具操作等太,主題的分區(qū)只能增加,不能減少蛮放。否則報錯:

ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.

通過--alter修改主題的分區(qū)數(shù)缩抡,增加分區(qū)。

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_y --partitions 5

分區(qū)副本的分配-了解

副本分配的三個目標:

  1. 均衡地將副本分散于各個broker上
  2. 對于某個broker上分配的分區(qū)包颁,它的其他副本在其他broker上
  3. 如果所有的broker都有機架信息瞻想,盡量將分區(qū)的各個副本分配到不同機架上的broker。

在不考慮機架信息的情況下:

  1. 第一個副本分區(qū)通過輪詢的方式挑選一個broker娩嚼,進行分配蘑险。該輪詢從broker列表的隨機位置進行輪詢。
  2. 其余副本通過增加偏移進行分配岳悟。

分配案例:

broker-0 broker-1 broker-2 broker-3 broker-4
p0 p1 p2 P3 P4 1st replica
p5 P6 p7 p8 p9 1st replica
p4 p0 p1 p2 P3 2nd replica
p8 p9 p5 p6 p7 2nd replica
p3 p4 p0 p1 P2 3nd replica
p7 p8 p9 p5 p6 3nd replica
副本分配源碼.png

考慮到機架信息漠其,首先為每個機架創(chuàng)建一個broker列表。如: 三個機架(rack1竿音,rack2, rack3)拴驮,六個broker(0春瞬,1,2套啤,3宽气,4,5)

brokerID -> rack
0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"

rack1:0潜沦,5

rack2:3萄涯,4

rack3:1,2

這broker列表為rack1的0唆鸡,rack2的3涝影,rack3的1,rack1的5争占,rack2的4燃逻,rack3的2 即:0, 3, 1, 5, 4, 2通過簡單的輪詢將分區(qū)分配給不同機架上的broker:


不同機架副本分配.png

每個分區(qū)副本在分配的時候在上一個分區(qū)第一個副本開始分配的位置右移一位。

六個broker臂痕,六個分區(qū)伯襟,正好最后一個分區(qū)的第一個副本分配的位置是該broker列表的最后一個。 如果有更多的分區(qū)需要分配握童,則算法開始對follower副本進行移位分配姆怪。 這主要是為了避免每次都得到相同的分配序列。 此時,如果有一個分區(qū)等待分配(分區(qū)6)稽揭,這按照如下方式分配:

6 -> 0,4,2 (而不是像分區(qū)0那樣重復(fù)0,3,1)

跟機架相關(guān)的副本分配中俺附,永遠在機架相關(guān)的broker列表中輪詢地分配第一個副本。 其余的副 本淀衣,傾向于機架上沒有副本的broker進行副本分配昙读,除非每個機架有一個副本。 然后其他的副本又通 過輪詢的方式分配給broker膨桥。

結(jié)果是蛮浑,如果副本的個數(shù)大于等于機架數(shù),保證每個機架最少有一個副本只嚣。 否則每個機架最多保有 一個副本沮稚。 如果副本的個數(shù)和機架的個數(shù)相同,并且每個機架包含相同個數(shù)的broker册舞,可以保證副本 在機架和broker之間均勻分布蕴掏。


不同機架副本分配1.png

上圖,tp_eagle_01主題的分區(qū)0分配信息:leader分區(qū)在broker1上调鲸,同步副本分區(qū)是1和2盛杰,也就 是在broker1和broker2上的兩個副本分區(qū)是同步副本分區(qū),其中一個是leader分區(qū)藐石。

必要參數(shù)配置

kafka-topics.sh --config xx=xx --config yy=yy

配置給主題的參數(shù)即供。

屬性 默認值 服務(wù)器默認屬性 說明
cleanup.policy delete log.cleanup.policy 要么是”delete“,要么是” compact“; 這個字符串指明了針對舊日志部分的利用方式;默 認方式("delete")將會丟棄舊的部分當(dāng)他們的回收時間或者尺寸限制到達時于微《旱眨”compact“將會 進行日志壓
compression.type none producer用于壓縮數(shù)據(jù)的壓縮 類型。默認是無壓縮株依。正確的選 項值是none驱证、gzip、snappy恋腕。 壓縮最好用于批量處理抹锄,批量處 理消息越多,壓縮性能越好荠藤。
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms 對于壓縮日志保留的最長時間祈远, 也是客戶端消費消息的最長時間,通log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù)商源, 一個控制壓縮后的數(shù)據(jù)车份。此項配 置可以在topic創(chuàng)建時的置頂參 數(shù)覆蓋
flush.ms None log.flush.interval.ms 此項配置用來置頂強制進行 fsync日志到磁盤的時間間隔; 例如,如果設(shè)置為1000牡彻,那么 每1000ms就需要進行一次 fsync扫沼。一般不建議使用這個選 項
flush.messages None log.flush.interval.messages 此項配置指定時間間隔:強制進 行fsync日志出爹。例如,如果這個選項設(shè)置為1缎除,那么每條消息之 后都需要進行fsync严就,如果設(shè)置為5,則每5條消息就需要進行一 次fsync器罐。一般來說梢为,建議你不 要設(shè)置這個值。此參數(shù)的設(shè)置, 需要在"數(shù)據(jù)可靠性"與"性能"之 間做必要的權(quán)衡.如果此值過大, 將會導(dǎo)致每次"fsync"的時間較長 (IO阻塞),如果此值過小,將會導(dǎo) 致"fsync"的次數(shù)較多,這也意味 著整體的client請求有一定的延 遲.物理server故障,將會導(dǎo)致沒 有fsync的消息丟失.
index.interval.bytes 4096 log.index.interval.bytes 默認設(shè)置保證了我們每4096個 字節(jié)就對消息添加一個索引轰坊,更 多的索引使得閱讀的消息更加靠 近铸董,但是索引規(guī)模卻會由此增 大;一般不需要改變這個選項
max.message.bytes 1000000 max.message.bytes kafka追加消息的最大尺寸。注意如果你增大這個尺寸肴沫,你也必 須增大你consumer的fetch 尺 寸粟害,這樣consumer才能fetch到 這些最大尺寸的消息。
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio 此項配置控制log壓縮器試圖進 行清除日志的頻率颤芬。默認情況 下悲幅,將避免清除壓縮率超過50% 的日志。這個比率避免了最大的 空間浪費
min.insync.replicas 1 min.insync.replicas 當(dāng)producer設(shè)置 request.required.acks為-1時站蝠, min.insync.replicas指定 replicas的最小數(shù)目(必須確認每一個repica的寫數(shù)據(jù)都是成功 的)汰具,如果這個數(shù)目沒有達到, producer會產(chǎn)生異常菱魔。
retention.bytes None log.retention.bytes 如果使用“delete”的retention 策 略郁副,這項配置就是指在刪除日志 之前,日志所能達到的最大尺 寸豌习。默認情況下,沒有尺寸限制 而只有時間限制
retention.ms 7 days log.retention.minutes 如果使用“delete”的retention策 略拔疚,這項配置就是指刪除日志前 日志保存的時間肥隆。
segment.bytes 1GB log.segment.bytes kafka中l(wèi)og日志是分成一塊塊存 儲的,此配置是指log日志劃分 成塊的大小
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有關(guān)offsets和文件位置 之間映射的索引文件的大小;一 般不需要修改這個配置
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
segment.ms 7 days log.roll.hours 即使log的分塊文件沒有達到需 要刪除稚失、壓縮的大小栋艳,一旦log 的時間達到這個上限,就會強制 新建一個log分塊文件
unclean.leader.election.enable true 指明了是否能夠使不在ISR中 replicas設(shè)置用來作為leader

KafkaAdminClient應(yīng)用

說明

除了使用Kafka的bin目錄下的腳本工具來管理Kafka句各,還可以使用管理Kafka的API將某些管理查看 的功能集成到系統(tǒng)中吸占。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務(wù)端凿宾,采用Scala 編寫)中的AdminClient和AdminUtils來實現(xiàn)部分的集群管理操作矾屯。Kafka0.11.0.0之后,又多了一個 AdminClient初厚,在kafka-client包下件蚕,一個抽象類,具體的實現(xiàn)是 org.apache.kafka.clients.admin.KafkaAdminClient。

功能與原理介紹

Kafka官網(wǎng):The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects排作。
KafkaAdminClient包含了一下幾種功能(以Kafka1.0.2版本為準):

  1. 創(chuàng)建主題:

    createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)

  2. 刪除主題:

    deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)

  3. 列出所有主題:

    listTopics(final ListTopicsOptions options)

  4. 查詢主題:

    describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)

  5. 查詢集群信息:

    describeCluster(DescribeClusterOptions options)

  6. 查詢配置信息:

    describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)

  7. 修改配置信息:

    alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)

  8. 修改副本的日志目錄:

    alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)

  9. 查詢節(jié)點的日志目錄信息:

    describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)

  10. 查詢副本的日志目錄信息:

    describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options)

  11. 增加分區(qū):
    createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)

其內(nèi)部原理是使用Kafka自定義的一套二進制協(xié)議來實現(xiàn)牵啦,詳細可以參見Kafka協(xié)議。

用到的參數(shù)
屬性 說明 重要性
bootstrap.servers 向Kafka集群建立初始連接用到的host/port列表妄痪。 客戶端會使用這里列出的所有服務(wù)器進行集群其他 服務(wù)器的發(fā)現(xiàn)哈雏,而不管是否指定了哪個服務(wù)器用作 引導(dǎo)。 這個列表僅影響用來發(fā)現(xiàn)集群所有服務(wù)器的初始主機衫生。<br />字符串形式:host1:port1,host2:port2,... 由于這組服務(wù)器僅用于建立初始鏈接裳瘪,然后發(fā)現(xiàn)集 群中的所有服務(wù)器,因此沒有必要將集群中的所有地址寫在這里障簿。一般最好兩臺盹愚,以防其中一臺宕掉。 High
client.id 生產(chǎn)者發(fā)送請求的時候傳遞給broker的id字符串站故。 用于在broker的請求日志中追蹤什么應(yīng)用發(fā)送了什 么消息皆怕。
一般該id是跟業(yè)務(wù)有關(guān)的字符串。
medium
connections.max.idle.ms 當(dāng)連接空閑時間達到這個值西篓,就關(guān)閉連接愈腾。long型數(shù)據(jù),默認:300000 medium
receive.buffer.bytes TCP接收緩存(SO_RCVBUF)岂津,如果設(shè)置為-1虱黄,則 使用操作系統(tǒng)默認的值。int類型值吮成,默認65536橱乱, 可選值:[-1,...] medium
request.timeout.ms 客戶端等待服務(wù)端響應(yīng)的最大時間。如果該時間超 時粱甫,則客戶端要么重新發(fā)起請求泳叠,要么如果重試耗 盡,請求失敗茶宵。int類型值危纫,默認:120000 medium
security.protocol 跟broker通信的協(xié)議:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string類型值,默認:PLAINTEXT medium
send.buffer.bytes 用于TCP發(fā)送數(shù)據(jù)時使用的緩沖大小 (SO_SNDBUF)乌庶,-1表示使用OS默認的緩沖區(qū)大 小种蝶。
int類型值,默認值:131072
medium
reconnect.backoff.max.ms 對于每個連續(xù)的連接失敗瞒大,每臺主機的退避將成倍 增加螃征,直至達到此最大值。在計算退避增量之后透敌, 添加20%的隨機抖動以避免連接風(fēng)暴会傲。 long型值锅棕,默認1000,可選值:[0,...] low
reconnect.backoff.ms 重新連接主機的等待時間淌山。避免了重連的密集循 環(huán)裸燎。該等待時間應(yīng)用于該客戶端到broker的所有連 接。
long型值泼疑,默認:50
low
retries The maximum number of times to retry a call before failing it.重試的次數(shù)德绿,達到此值,失敗退渗。 int類型值移稳,默認5。 low
retry.backoff.ms 在發(fā)生失敗的時候如果需要重試会油,則該配置表示客 戶端等待多長時間再發(fā)起重試个粱。 該時間的存在避免了密集循環(huán)。 long型值翻翩,默認值:100都许。 low

主要操作步驟: 客戶端根據(jù)方法的調(diào)用創(chuàng)建相應(yīng)的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法嫂冻,其內(nèi)部就是發(fā)
送CreateTopicRequest請求胶征。
客戶端發(fā)送請求至Kafka Broker。
Kafka Broker處理相應(yīng)的請求并回執(zhí)桨仿,比如與CreateTopicRequest對應(yīng)的是 CreateTopicResponse睛低。 客戶端接收相應(yīng)的回執(zhí)并進行解析處理。
和協(xié)議有關(guān)的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中服傍, AbstractRequest和AbstractResponse是這些請求和響應(yīng)類的兩個父類钱雷。
綜上,如果要自定義實現(xiàn)一個功能吹零,只需要三個步驟:

  1. 自定義XXXOptions;
  2. 自定義XXXResult返回值;
  3. 自定義Call罩抗,然后挑選合適的XXXRequest和XXXResponse來實現(xiàn)Call類中的3個抽象方法。
package com.hhb.kafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
 * @description:
 * @author: 
 * @date: 2020-08-18 21:35
 **/
public class MyAdminClientTeacher {


    private KafkaAdminClient kafkaAdminClient;


    /**
     * 初始化KafkaAdminClient客戶端
     */
    @Before
    public void init() {
        HashMap<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "hhb:9092");


        kafkaAdminClient = (KafkaAdminClient) KafkaAdminClient.create(configs);
    }


    /**
     * 新建一個主題
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testCreateTopic() throws ExecutionException,
            InterruptedException {
        Map<String, String> configs = new HashMap<>();
        configs.put("max.message.bytes", "1048576");
        configs.put("segment.bytes", "1048576000");
        NewTopic newTopic = new NewTopic("hhb", 2, (short) 1);
        newTopic.configs(configs);
        CreateTopicsResult topics = kafkaAdminClient.createTopics(Collections.singleton(newTopic));
        KafkaFuture<Void> all = topics.all();
        Void aVoid = all.get();
        System.out.println(aVoid);
    }

    /**
     * 刪除主題
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testDeleteTopic() throws ExecutionException,
            InterruptedException {
        DeleteTopicsOptions options = new DeleteTopicsOptions();
        options.timeoutMs(500);
        DeleteTopicsResult deleteResult = kafkaAdminClient.deleteTopics(Collections.singleton("hhb"), options);
        deleteResult.all().get();
    }

    /**
     * 修改配置信息
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testAlterTopic() throws ExecutionException, InterruptedException {
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
        newPartitionsMap.put("adm_tp_01", newPartitions);
        CreatePartitionsOptions option = new CreatePartitionsOptions();
        // Set to true if the request should be validated without creating new partitions.
        // 如果只是驗證瘪校,而不創(chuàng)建分區(qū),則設(shè)置為true
        // option.validateOnly(true);
        CreatePartitionsResult partitionsResult = kafkaAdminClient.createPartitions(newPartitionsMap, option);
        Void aVoid = partitionsResult.all().get();
    }

    /**
     * 查看topic詳細信息
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testDescribeTopics() throws ExecutionException,
            InterruptedException {
        DescribeTopicsOptions options = new DescribeTopicsOptions();
        options.timeoutMs(3000);
        DescribeTopicsResult topicsResult = kafkaAdminClient.describeTopics(Collections.singleton("hhb"), options);
        Map<String, TopicDescription> stringTopicDescriptionMap = topicsResult.all().get();
        stringTopicDescriptionMap.forEach((k, v) -> {
            System.out.println(k + "\t" + v);
            System.out.println("=======================================");
            System.out.println(k);
            boolean internal = v.isInternal();
            String name = v.name();
            List<TopicPartitionInfo> partitions = v.partitions();
            String partitionStr = Arrays.toString(partitions.toArray());
            System.out.println("內(nèi)部的?" + internal);
            System.out.println("topic name = " + name);
            System.out.println("分區(qū):" + partitionStr);
            partitions.forEach(partition -> {
                System.out.println(partition);
            });
        });
    }

    /**
     * 查詢集群信息
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testDescribeCluster() throws ExecutionException,
            InterruptedException {
        DescribeClusterResult describeClusterResult = kafkaAdminClient.describeCluster();
        KafkaFuture<String> stringKafkaFuture = describeClusterResult.clusterId();
        String s = stringKafkaFuture.get();
        System.out.println("cluster name = " + s);
        KafkaFuture<Node> controller = describeClusterResult.controller();
        Node node = controller.get();
        System.out.println("集群控制器:" + node);
        Collection<Node> nodes = describeClusterResult.nodes().get();
        nodes.forEach(node1 -> {
            System.out.println(node1);
        });
    }


    /**
     * 列出所有的主題
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testListTopics() throws ExecutionException, InterruptedException {

        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        //是否獲取內(nèi)置主題
        listTopicsOptions.listInternal(true);
        //請求超時時間名段,毫秒
        listTopicsOptions.timeoutMs(500);
        //列出所有的主題
        ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(listTopicsOptions);
        //列出所有的非內(nèi)置主題
        //ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics();
        //打印所有的主題名稱
        Set<String> strings = listTopicsResult.names().get();
        for (String name : strings) {
            System.err.println("name==>>" + name);
        }
        System.err.println("========================================================================================================");
        //將請求變成同步的阱扬,直接獲取結(jié)果
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        topicListings.forEach(topics -> {
            //是否是一個內(nèi)置的主題,內(nèi)置的主題:_consumer_offsets_
            boolean internal = topics.isInternal();
            //主題名稱
            String name = topics.name();
            System.err.println("是否為內(nèi)部主題:" + internal + ",該主題的名字: " + name + ", toString" + topics.toString());
        });
        System.err.println("========================================================================================================");
    }


    /**
     * 查詢配置信息
     *
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @Test
    public void testDescribeConfigs() throws ExecutionException,
            InterruptedException, TimeoutException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
        DescribeConfigsResult describeConfigsResult =
                kafkaAdminClient.describeConfigs(Collections.singleton(configResource));
        Map<ConfigResource, Config> configMap =
                describeConfigsResult.all().get(15, TimeUnit.SECONDS);
        configMap.forEach(new BiConsumer<ConfigResource, Config>() {
            @Override
            public void accept(ConfigResource configResource, Config config) {
                ConfigResource.Type type = configResource.type();
                String name = configResource.name();
                System.out.println("資源名稱:" + name);
                Collection<ConfigEntry> entries = config.entries();
                entries.forEach(new Consumer<ConfigEntry>() {
                    @Override
                    public void accept(ConfigEntry configEntry) {
                        boolean aDefault = configEntry.isDefault();
                        boolean readOnly = configEntry.isReadOnly();
                        boolean sensitive = configEntry.isSensitive();
                        String name1 = configEntry.name();
                        String value = configEntry.value();
                        System.out.println("是否默認:" + aDefault + "\t是否 只讀?" + readOnly + "\t是否敏感?" + sensitive
                                + "\t" + name1 + " --> " + value);
                    }
                });
                ConfigEntry retries = config.get("retries");
                if (retries != null) {
                    System.out.println(retries.name() + " -->" +
                            retries.value());
                } else {
                    System.out.println("沒有這個屬性");
                }
            }

        });
    }

    @Test
    public void testAlterConfig() throws ExecutionException,
            InterruptedException {
        // 這里設(shè)置后伸辟,原來資源中不沖突的屬性也會丟失麻惶,直接按照這里的配置設(shè)置
        Map<ConfigResource, Config> configMap = new HashMap<>();
        ConfigResource resource = new
                ConfigResource(ConfigResource.Type.TOPIC, "adm_tp_01");
        Config config = new Config(Collections.singleton(new
                ConfigEntry("segment.bytes", "1048576000")));
        configMap.put(resource, config);
        AlterConfigsResult alterConfigsResult = kafkaAdminClient.alterConfigs(configMap);
        Void aVoid = alterConfigsResult.all().get();
    }

    @Test
    public void testDescribeLogDirs() throws ExecutionException,
            InterruptedException {
        DescribeLogDirsOptions option = new DescribeLogDirsOptions();
        option.timeoutMs(1000);
        DescribeLogDirsResult describeLogDirsResult = kafkaAdminClient.describeLogDirs(Collections.singleton(0), option);
        Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap
                = describeLogDirsResult.all().get();
        integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
            @Override
            public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
                System.out.println("broker.id = " + integer);
                stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
                    @Override
                    public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
                        System.out.println("log.dirs:" + s);

                        // 查看該broker上的主題/分區(qū)/偏移量等信息 //
                        logDirInfo.replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
                            @Override
                            public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
                                int partition = topicPartition.partition();
                                String topic = topicPartition.topic();
                                boolean isFuture = replicaInfo.isFuture;
                                long offsetLag = replicaInfo.offsetLag;
                                long size = replicaInfo.size;
                                System.out.println("partition:" + partition + "\ttopic:" + topic + "\tisFuture:" + isFuture + "\toffsetLag:" + offsetLag + "\tsize:" + size);
                            }
                        });
                    }
                });
            }
        });
    }

    /**
     * 關(guān)閉KafkaAdminClient客戶端
     */
    @After
    public void destroy() {
        kafkaAdminClient.close();
    }
}

偏移量管理

Kafka 1.0.2,__consumer_offsets主題中保存各個消費組的偏移量信夫。早期由zookeeper管理消費組的偏移量窃蹋。

查詢方法:通過原生 kafka 提供的工具腳本進行查詢卡啰。 工具腳本的位置與名稱為 bin/kafka-consumer-groups.sh 首先運行腳本,查看幫助:

參數(shù) 說明
--all-topics 將所有關(guān)聯(lián)到指定消費組的主題都劃歸到reset-offsets 操作范圍警没。
--bootstrap-server<String: server to connect to> 必須:(基于消費組的新的消費者): 要連接的服務(wù)器地址匈辱。
--by-duration<String: duration> 距離當(dāng)前時間戳的一個時間段。格式:'PnDTnHnMnS'
--command-config <String: command config property file> 指定配置文件杀迹,該文件內(nèi)容傳遞給Admin Client和消費者亡脸。
--delete 傳值消費組名稱,刪除整個消費組與所有主題的各個分區(qū)偏移量 和所有者關(guān)系树酪。 如: --group g1 --group g2 浅碾。 傳值消費組名稱和單個主題魁袜,僅刪除該消費組到指定主題的分區(qū) 偏移量和所屬關(guān)系舞痰。如: --group g1 --group g2 --topic t1 。 傳值一個主題名稱牍陌,僅刪除指定主題與所有消費組分區(qū)偏移量以 及所屬關(guān)系疮茄。 如: --topic t1 注意:消費組的刪除僅對基于ZK保存偏移量的消費組有效滥朱,并且 要小心使用,僅刪除不活躍的消費組娃豹。
--describe 描述給定消費組的偏移量差距(有多少消息還沒有消費)焚虱。
--execute 執(zhí)行操作。支持的操作: reset-offsets懂版。
--export 導(dǎo)出操作的結(jié)果到CSV文件鹃栽。支持的操作: reset-offsets 。
--from-file <String: path to CSV file> 重置偏移量到CSV文件中定義的值躯畴。
--group <String: consumer group> 目標消費組民鼓。
--list 列出所有消費組。
--new-consumer 使用新的消費者實現(xiàn)蓬抄。這是默認值丰嘉。隨后的發(fā)行版中會刪除這一 操作。
--reset-offsets 重置消費組的偏移量嚷缭。當(dāng)前一次操作只支持一個消費組饮亏,并且該 消費組應(yīng)該是不活躍的。 有三個操作選項 <br />1. (默認)plan:要重置哪個偏移量阅爽。<br />2. execute:執(zhí)行 reset-offsets 操作<br />3. process:配合 --export 將操作結(jié)果導(dǎo)出到CSV格式路幸。<br />可以使用如下選項:<br />--to-datetime<br /> --by-period<br /> --to-earliest<br /> --to-latest<br /> --shift-by<br /> --from-file<br /> --to-current 。<br />必須選擇一個選項使用付翁。 要定義操作的范圍简肴,使用:<br />--all-topics<br />--topic 。 <br />必須選擇一個百侧,除非使用 --from-file 選項砰识。
--shift-by <Long: number-of-offsets> 重置偏移量n個能扒。n可以是正值,也可以是負值辫狼。
--timeout <Long: timeout (ms)> 對某些操作設(shè)置超時時間初斑。 如:對于描述指定消費組信息,指定毫秒值的最大等待時間予借,以 獲取正常數(shù)據(jù)(如剛創(chuàng)建的消費組越平,或者消費組做了一些更改操 作)。默認時間: 5000 灵迫。
--to-current 重置到當(dāng)前的偏移量秦叛。
--to-datetime 重置偏移量到指定的時間戳。格式:'YYYY-MM- DDTHH:mm:SS.sss'
--to-earliest 重置為最早的偏移量
--to-latest 重置為最新的偏移量
--to-offset <Long: offset> 重置到指定的偏移量瀑粥。
--topic <String: topic> 指定哪個主題的消費組需要刪除挣跋,或者指定哪個主題的消費組需 要包含到 reset-offsets 操作中。對于 reset-offsets 操作狞换,還 可以指定分區(qū): topic1:0,1,2 避咆。其中0,1修噪,2表示要包含到操 作中的分區(qū)號查库。重置偏移量的操作支持多個主題一起操作。
--zookeeper <String: urls> 必須黄琼,它的值樊销,你懂的。 --zookeeper node1:2181/myKafka 脏款。

這里我們先編寫一個生產(chǎn)者围苫,消費者的例子:

我們先啟動消費者,再啟動生產(chǎn)者撤师, 再通過 bin/kafka-consumer-groups.sh 進行消費偏移量查 詢剂府,由于kafka 消費者記錄group的消費偏移量有兩種方式 :

  1. kafka 自維護 (新)

  2. zookpeer 維護 (舊) ,已經(jīng)逐漸被廢棄

所以 剃盾,腳本只查看由broker維護的腺占,由zookeeper維護的可以將 --bootstrap-server 換成--zookeeper即可。

查看有哪些group ID正在進行消費

 kafka-consumer-groups.sh --bootstrap-server hhb:9092 --list

注意:

  1. 這里面是沒有指定 topic痒谴,查看的是所有topic消費者的 group.id 的列表衰伯。

  2. 注意: 重名的 group.id 只會顯示一次

查看指定group.id 的消費者消費情況

kafka-consumer-groups.sh --bootstrap-server hhb:9092 --describe --group group

如果消費者停止,查看偏移量信息:

kafka-consumer-groups.sh --bootstrap-server hhb:9092 --describe --group group

將偏移量設(shè)置為最早的:

kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02 --to-earliest --execute

將偏移量設(shè)置為最新的:

kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02 --to-latest --execute

分別將指定主題的指定分區(qū)的偏移量向前移動10個消息:

kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02:0 --shift-by -10  --execute

將指定主題的多個分區(qū)的偏移量向前移動5個消息:

kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02:0,2 --shift-by -5  --execute
代碼
  • Producer:

MyProducer

package com.hhb.kafka.offset.producer;

/**
 * @description:
 * @author: 
 * @date: 2020-08-19 20:51
 **/
public class MyProducer {
    public static void main(String[] args) {
        Thread thread = new Thread(new ProducerHandler("hello lagou "));
        thread.start();
    }
}

KafkaProducerSingleton:

package com.hhb.kafka.offset.producer;

import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.Random;

/**
 * @description:
 * @author: 
 * @date: 2020-08-19 20:49
 **/
public class KafkaProducerSingleton {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
    private static KafkaProducer<String, String> kafkaProducer;
    private Random random = new Random();
    private String topic;
    private int retry;

    private KafkaProducerSingleton() {
    }


    /**
     * 靜態(tài)內(nèi)部類
     *
     * @author tanjie
     */
    private static class LazyHandler {
        private static final KafkaProducerSingleton instance = new
                KafkaProducerSingleton();
    }

    /**
     * 單例模式,kafkaProducer是線程安全的,可以多線程共享一個實例 * @return
     */
    public static final KafkaProducerSingleton getInstance() {
        return LazyHandler.instance;
    }


    /**
     * kafka生產(chǎn)者進行初始化
     *
     * @return KafkaProducer
     */
    public void init(String topic, int retry) {
        this.topic = topic;
        this.retry = retry;
        if (null == kafkaProducer) {
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
            kafkaProducer = new KafkaProducer<String, String>(props);
        }
    }

    /**
     * 通過kafkaProducer發(fā)送消息 * @param message
     */
    public void sendKafkaMessage(final String message) {
        ProducerRecord<String, String> record = new ProducerRecord<String,
                String>(
                topic, random.nextInt(3), "", message);
        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata,
                                     Exception exception) {
                if (null != exception) {
                    LOGGER.error("kafka發(fā)送消息失敗:" + exception.getMessage(), exception);
                    retryKakfaMessage(message);
                }
            }
        });
    }

    /**
     * 當(dāng)kafka消息發(fā)送失敗后,重試 *
     *
     * @param retryMessage
     */
    private void retryKakfaMessage(final String retryMessage) {
        ProducerRecord<String, String> record = new ProducerRecord<String,
                String>(
                topic, random.nextInt(3), "", retryMessage);
        for (int i = 1; i <= retry; i++) {
            try {
                kafkaProducer.send(record);
                return;
            } catch (Exception e) {
                LOGGER.error("kafka發(fā)送消息失敗:" + e.getMessage(), e);
                retryKakfaMessage(retryMessage);
            }
        }
    }

    /**
     * kafka實例銷毀
     */
    public void close() {
        if (null != kafkaProducer) {
            kafkaProducer.close();
        }
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getRetry() {
        return retry;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }
}

ProducerHandler:

package com.hhb.kafka.offset.producer;

/**
 * @description:
 * @author: 
 * @date: 2020-08-19 20:50
 **/
public class ProducerHandler implements Runnable {

    private String message;

    public ProducerHandler(String message) {
        this.message = message;
    }

    @Override
    public void run() {
        KafkaProducerSingleton kafkaProducerSingleton =
                KafkaProducerSingleton.getInstance();
        kafkaProducerSingleton.init("tp_demo_02", 3);
        int i = 0;
        while (true) {
            try {
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName()
                        + "\t獲取的kafka實例:" + kafkaProducerSingleton);
                kafkaProducerSingleton.sendKafkaMessage("發(fā)送消息: " +
                        message + " " + (++i));
                Thread.sleep(100);
            } catch (Exception e) {
            }
        }
    }
}
  • consumer

ConsumerAutoMain

package com.hhb.kafka.offset.consumer;

/**
 * @description:
 * @author: 
 * @date: 2020-08-19 20:54
 **/
public class ConsumerAutoMain {
    public static void main(String[] args) {
        KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
        try {
            kafka_consumerAuto.execute();
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            kafka_consumerAuto.shutdown();
        }
    }
}

ConsumerThreadAuto:

package com.hhb.kafka.offset.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * @description:
 * @author: 
 * @date: 2020-08-19 20:53
 **/
public class ConsumerThreadAuto implements Runnable {
    private ConsumerRecords<String, String> records;
    private KafkaConsumer<String, String> consumer;

    public ConsumerThreadAuto(ConsumerRecords<String, String> records,
                              KafkaConsumer<String, String> consumer) {
        this.records = records;
        this.consumer = consumer;
    }

    @Override
    public void run() {
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("當(dāng)前線程:" + Thread.currentThread()
                    + "\t主題:" + record.topic()
                    + "\t偏移量:" + record.offset() + "\t分區(qū):" + record.partition() + "\t獲取的消息:" + record.value());
        }
    }
}

KafkaConsumerAuto:

package com.hhb.kafka.offset.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: 
 * @date: 2020-08-19 20:51
 **/
public class KafkaConsumerAuto {

/**
   * kafka消費者不是線程安全的
   */
  private final KafkaConsumer<String, String> consumer;

  private ExecutorService executorService;


  public KafkaConsumerAuto() {
      Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
      // 打開自動提交
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      props.put("auto.commit.interval.ms", "100");
      props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      consumer = new KafkaConsumer<String, String>(props);
      // 訂閱主題
      consumer.subscribe(Collections.singleton("tp_demo_02"));
  }

  public void execute() throws InterruptedException {
      executorService = Executors.newFixedThreadPool(2);
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(2_000);
          if (null != records) {
              executorService.submit(new ConsumerThreadAuto(records,
                      consumer));
          }
          Thread.sleep(1000);
      }
  }

  public void shutdown() {
      try {
          if (consumer != null) {
              consumer.close();
          }
          if (executorService != null) {
              executorService.shutdown();
          }
          if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
              System.out.println("關(guān)閉線程池超時闰歪。嚎研。蓖墅。");
          }
      } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
      }
  }
}

分區(qū)

副本機制

副本機制.png

Kafka在一定數(shù)量的服務(wù)器上對主題分區(qū)進行復(fù)制库倘。 當(dāng)集群中的一個broker宕機后系統(tǒng)可以自動故障轉(zhuǎn)移到其他可用的副本上临扮,不會造成數(shù)據(jù)丟失。

--replication-factor 3 1leader+2follower

  1. 將復(fù)制因子為1的未復(fù)制主題稱為復(fù)制主題教翩。
  2. 主題的分區(qū)是復(fù)制的最小單元杆勇。
  3. 在非故障情況下,Kafka中的每個分區(qū)都有一個Leader副本和零個或多個Follower副本饱亿。
  4. 包括Leader副本在內(nèi)的副本總數(shù)構(gòu)成復(fù)制因子蚜退。
  5. 所有讀取和寫入都由Leader副本負責(zé)。
  6. 通常彪笼,分區(qū)比broker多钻注,并且Leader分區(qū)在broker之間平均分配。

Follower分區(qū)像普通的Kafka消費者一樣配猫,消費來自Leader分區(qū)的消息幅恋,并將其持久化到自己的日志中。允許Follower對日志條目拉取進行批處理泵肄。

同步節(jié)點定義:

  1. 節(jié)點必須能夠維持與ZooKeeper的會話(通過ZooKeeper的心跳機制)
  2. 對于Follower副本分區(qū)捆交,它復(fù)制在Leader分區(qū)上的寫入,并且不要延遲太多

Kafka提供的保證是腐巢,只要有至少一個同步副本處于活動狀態(tài)品追,提交的消息就不會丟失。

宕機如何恢復(fù)

  • 少部分副本宕機

    當(dāng)leader宕機了冯丙,會從follower選擇一個作為leader肉瓦。當(dāng)宕機的重新恢復(fù)時,會把之前commit的數(shù) 據(jù)清空银还,重新從leader里pull數(shù)據(jù)风宁。

  • 全部副本宕機
    當(dāng)全部副本宕機了有兩種恢復(fù)方式

    • 等待ISR中的一個恢復(fù)后,并選它作為leader蛹疯。(等待時間較長戒财,降低可用性)
    • 選擇第一個恢復(fù)的副本作為新的leader,無論是否在ISR中捺弦。(并未包含之前l(fā)eader commit的 數(shù)據(jù)饮寞,因此造成數(shù)據(jù)丟失)

Leader選舉

下圖中分區(qū)P1的Leader是0,ISR是0和1 分區(qū)P2的Leader是2列吼,ISR是1和2 分區(qū)P3的Leader是1幽崩,ISR是0,1,2。


Leader選舉.png

生產(chǎn)者和消費者的請求都由Leader副本來處理寞钥。Follower副本只負責(zé)消費Leader副本的數(shù)據(jù)和 Leader保持同步慌申。對于P1,如果0宕機會發(fā)生什么? Leader副本和Follower副本之間的關(guān)系并不是固定不變的,在Leader所在的broker發(fā)生故障的時候蹄溉,就需要進行分區(qū)的Leader副本和Follower副本之間的切換咨油,需要選舉Leader副本。

如何選舉:

如果某個分區(qū)所在的服務(wù)器出了問題柒爵,不可用役电,kafka會從該分區(qū)的其他的副本中選擇一個作為新 的Leader。之后所有的讀寫就會轉(zhuǎn)移到這個新的Leader上∶拚停現(xiàn)在的問題是應(yīng)當(dāng)選擇哪個作為新的 Leader法瑟。只有那些跟Leader保持同步的Follower才應(yīng)該被選作新的Leader。Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica唁奢,已同步的副本)的集 合霎挟,該集合中是一些分區(qū)的副本。只有當(dāng)這些副本都跟Leader中的副本同步了之后麻掸,kafka才會認為消息已提交氓扛,并反饋給消息的生 產(chǎn)者。如果這個集合有增減论笔,kafka會更新zookeeper上的記錄采郎。如果某個分區(qū)的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader狂魔。 顯然通過ISR蒜埋,kafka需要的冗余度較低,可以容忍的失敗數(shù)比較高最楷。 假設(shè)某個topic有N+1個副本整份,kafka可以容忍N個服務(wù)器不可用。

為什么不用少數(shù)服從多數(shù)的方法

少數(shù)服從多數(shù)是一種比較常見的一致性算發(fā)和Leader選舉法籽孙。 它的含義是只有超過半數(shù)的副本同步了烈评,系統(tǒng)才會認為數(shù)據(jù)已同步; 選擇Leader時也是從超過半數(shù)的同步的副本中選擇。 這種算法需要較高的冗余度犯建,跟Kafka比起來讲冠,浪費資源。 譬如只允許一臺機器失敗适瓦,需要有三個副本;而如果只容忍兩臺機器失敗竿开,則需要五個副本。 而kafka的ISR集合方法玻熙,分別只需要兩個和三個副本否彩。

如果所有的ISR副本都失敗了怎么辦?

此時有兩種方法可選,

  1. 等待ISR集合中的副本復(fù)活嗦随,

  2. 選擇任何一個立即可用的副本列荔,而這個副本不一定是在ISR集合中。

    需要設(shè)置 unclean.leader.election.enable=true

這兩種方法各有利弊,實際生產(chǎn)中按需選擇贴浙。 如果要等待ISR副本復(fù)活筷转,雖然可以保證一致性,但可能需要很長時間悬而。而如果選擇立即可用的副本,則很可能該副本并不一致锭汛。

總結(jié):

Kafka中Leader分區(qū)選舉笨奠,通過維護一個動態(tài)變化的ISR集合來實現(xiàn),一旦Leader分區(qū)丟掉唤殴,則從 ISR中隨機挑選一個副本做新的Leader分區(qū)般婆。如果ISR中的副本都丟失了,則:

  1. 可以等待ISR中的副本任何一個恢復(fù)朵逝,接著對外提供服務(wù)蔚袍,需要時間等待。

  2. 從OSR中選出一個副本做Leader副本配名,此時會造成數(shù)據(jù)丟失

分區(qū)重新分配

向已經(jīng)部署好的Kafka集群里面添加機器啤咽,我們需要從已經(jīng)部署好的Kafka節(jié)點中復(fù)制相應(yīng)的配置文 件,然后把里面的broker id修改成全局唯一的渠脉,最后啟動這個節(jié)點即可將它加入到現(xiàn)有Kafka集群中宇整。

問題:新添加的Kafka節(jié)點并不會自動地分配數(shù)據(jù),無法分擔(dān)集群的負載芋膘,除非我們新建一個 topic鳞青。

需要手動將部分分區(qū)移到新添加的Kafka節(jié)點上,Kafka內(nèi)部提供了相關(guān)的工具來重新分布某個 topic的分區(qū)为朋。在重新分布topic分區(qū)之前臂拓,我們先來看看現(xiàn)在topic的各個分區(qū)的分布位置

創(chuàng)建主題:

kafka-topics.sh--zookeeper hhb:2181/myKafka --create --topic tp_re_01 --partitions 5 --replication-factor 1

查看主題信息,所有的主題分區(qū)都集中在broker0上∠按纾現(xiàn)在想在另一個機器上搭建一個Kafka胶惰,組成新的kafka機器

 kafka-topics.sh --zookeeper --describe --topic tp_re_01

把當(dāng)前kafka文件拷貝到linux121

scp -r kafka_2.12-1.0.2 linux121:/mnt/module/

在linux121上操作:

cd /mnt/module/kafka_2.12-1.0.2/
vim /etc/profile

#配置kafka
##KAFKA_HOME
export KAFKA_HOME=/mnt/module/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

cd config/
vim server.properties
##修改broker.id
broker.id = 1
##修改zookeeper地址
zookeeper.connect=hhb:2181/myKafka

# 保存后退出
cd ../ 
rm -rf logs/*
rm -rf kafka-logs/*

啟動kafka

kafka-server-start.sh -daemon  /mnt/module/kafka_2.12-1.0.2/config/server.properties

查看kafka是否已經(jīng)注冊到集群。進入到hhb服務(wù)器霞溪,進入到zookeeper中

zkCli.sh
## 查看該節(jié)點下是否有一個數(shù)組童番,包含兩個元素
ls /myKafka/brokers/ids
# 獲取該節(jié)點的信息
get /myKafka/cluster/id

##返回:
{"version":"1","id":"UozyfTtKTrmHoPEeF8pD3Q"}
cZxid = 0x1a
ctime = Tue Aug 11 21:29:37 CST 2020
mZxid = 0x1a
mtime = Tue Aug 11 21:29:37 CST 2020
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0

該ID:UozyfTtKTrmHoPEeF8pD3Q就是集群ID,可以再去linux121的啟動日記里面看到威鹿,該服務(wù)啟動的clusterId也是UozyfTtKTrmHoPEeF8pD3Q剃斧。證明該節(jié)點已經(jīng)加入到集群。

現(xiàn)在集群上有兩個節(jié)點忽你,但是剛才創(chuàng)建的tp_re_01的topic的5個分區(qū)都在broker0幼东,目標:將3、4分區(qū)的Leader放到broker1上。

在linux121 ~目錄下:

vim topic-to-move.json

{
    "topics": [
        {
            "topic": "tp_re_01"
        }
    ],
    "version": 1
}

## 保存退出

使用kafka-reassign-partitions.sh來處理分區(qū)移動:

參數(shù):

參數(shù) 解釋
--generate 只是生成一個移動分區(qū)的計劃根蟹,并不會執(zhí)行
--execute 真正的執(zhí)行計劃
--verify 驗證計劃是否執(zhí)行成功

生成計劃:

kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --topics-to-move-json-file topic-to-move.json --broker-list "0,1" --generate
計劃規(guī)劃.png

將系統(tǒng)生成的執(zhí)行計劃脓杉,放到topic-to-move-exec.json

vim topic-to-move-exec.json


{
    "version": 1,
    "partitions": [
        {
            "topic": "tp_re_01",
            "partition": 4,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 1,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 2,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 3,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 0,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        }
    ]
}


## 執(zhí)行該計劃
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file topic-to-move-exec.json  --execute

此時可以看到分區(qū)的變化,1简逮、3的分區(qū)移動到了broker1上球散。我們的目標是將3、4分區(qū)移動到broker1上,所以很準備一個新的計劃:my-topic-to-move-exec.json

vim my-topic-to-move-exec.json

{
    "version": 1,
    "partitions": [
        {
            "topic": "tp_re_01",
            "partition": 4,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 1,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 2,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 3,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "tp_re_01",
            "partition": 0,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        }
    ]
}


### 再次執(zhí)行計劃:
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file my-topic-to-move-exec.json  --execute

驗證計劃執(zhí)行結(jié)果

kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file my-topic-to-move-exec.json  --verify

自動在平衡

我們可以在新建主題的時候散庶,手動指定主題各個Leader分區(qū)以及Follower分區(qū)的分配情況蕉堰,即什么分區(qū)副本在哪個broker節(jié)點上。隨著系統(tǒng)的運行悲龟,broker的宕機重啟屋讶,會引發(fā)Leader分區(qū)和Follower分區(qū)的角色轉(zhuǎn)換,最后可能 Leader大部分都集中在少數(shù)幾臺broker上须教,由于Leader負責(zé)客戶端的讀寫操作皿渗,此時集中Leader分區(qū) 的少數(shù)幾臺服務(wù)器的網(wǎng)絡(luò)I/O,CPU轻腺,以及內(nèi)存都會很緊張乐疆。

Leader和Follower的角色轉(zhuǎn)換會引起Leader副本在集群中分布的不均衡,此時我們需要一種手 段贬养,讓Leader的分布重新恢復(fù)到一個均衡的狀態(tài)诀拭。

創(chuàng)建主題,直接指定該主題每個分區(qū)在哪個broker中

kafka-topics.sh --zookeeper hhb:2181/myKafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"

上面的命令“0:1,1:0,0:1”表示一共創(chuàng)建三個分區(qū)煤蚌,第一個分區(qū)“0:1”:在broker0和broker1上耕挨,Leader在broker0上,第二個“1:0”表示尉桩,在broker0和broker1上筒占,Leader在broker1上,第一位就是Leader副本,2:1:3:0 表示有4個副本蜘犁,Leader副本在broker.id 為2 的服務(wù)器翰苫。

然后模擬broker0宕機的情況:

## 找到kafka的進程ID
jps
### 直接刪除kafka進程
kill -9 755

## 此時上另一個服務(wù)器上查看該topic詳細信息≌獬龋可以看到所有的分區(qū)都在broker1上
kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_demo_03
## 重啟broker0
kafka-server-start.sh -daemon  /mnt/module/kafka_2.12-1.0.2/config/server.properties
## 再次查看topic信息奏窑,發(fā)現(xiàn)所有分區(qū)還是都在broker1
kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_demo_03
# broker恢復(fù)了,但是Leader的分配并沒有變化屈扎,還是處于Leader切換后的分配情況埃唯。

是否有一種方式,可以讓Kafka自動幫我們進行修改?改為初始的副本分配? 此時鹰晨,用到了Kafka提供的自動再均衡腳本: kafka-preferred-replica-election.sh 先看介紹:


kafka-preferred-replica-election.png

該工具會讓每個分區(qū)的Leader副本分配在合適的位置墨叛,讓Leader分區(qū)和Follower分區(qū)在服務(wù)器之間均衡分配.如果該腳本僅指定zookeeper地址止毕,則會對集群中所有的主題進行操作,自動再平衡漠趁。

  1. 該topic所有的分區(qū)進行再平衡扁凛,恢復(fù)到新建分區(qū)時候的分配

    kafka-preferred-replica-election.sh --zookeeper localhost:2181/myKafka
    

    之所以是這樣的分配,是因為我們在創(chuàng)建主題的時候:--replica-assignment"0:1,1:0,0:1"闯传,在逗號分割的每個數(shù)值對中排在前面的是Leader分區(qū)谨朝,后面的是副本分區(qū)。那么所謂的preferred replica甥绿,就是排在前面的數(shù)字就是Leader副本應(yīng)該在的brokerId字币。

  2. 重平衡制定分區(qū)

    • 先創(chuàng)建一個preferred-replica.json文件,在這只恢復(fù)tp_demo_03的0號分區(qū),如果是多個妹窖,可是參考上圖的解釋

      {
          "partitions": [
              {
                  "topic": "tp_demo_03",
                  "partition": 0
              }
          ]
      }
      
    • 執(zhí)行命令

      kafka-preferred-replica-election.sh --zookeeper localhost:2181/myKafka --path-to-json-file preferred-replica.json
      

重新查看該topic,會發(fā)現(xiàn)收叶,分區(qū)0已經(jīng)恢復(fù)到新建的模樣骄呼。

修改分區(qū)副本

實際項目中,我們可能由于主題的副本因子設(shè)置的問題判没,需要重新設(shè)置副本因子 或者由于集群的擴展蜓萄,需要重新設(shè)置副本因子。 topic一旦使用又不能輕易刪除重建澄峰,因此動態(tài)增加副本因子就成為最終的選擇嫉沽。

說明:kafka 1.0版本配置文件默認沒有default.replication.factor=x, 因此如果創(chuàng)建topic時俏竞,不指定replication-factor 時候绸硕, 默認副本因子為1. 我們可以在自己的server.properties中配置上常用的副本因子,省去手動調(diào)整。例如設(shè)置default.replication.factor=3魂毁, 詳細內(nèi)容可參考官方文檔

原因分析:

假設(shè)我們有2個kafka broker分別broker0玻佩,broker1。

  1. 當(dāng)我們創(chuàng)建的topic有2個分區(qū)partition時并且replication-factor為1席楚,基本上一個broker上一 個分區(qū)咬崔。當(dāng)一個broker宕機了,該topic就無法使用了烦秩,因為兩個個分區(qū)只有一個能用垮斯。
  2. 當(dāng)我們創(chuàng)建的topic有3個分區(qū)partition時并且replication-factor為2時,可能分區(qū)數(shù)據(jù)分布情 況是 broker0只祠, partiton0兜蠕,partiton1,partiton2抛寝, broker1牺氨, partiton1狡耻,partiton0, partiton2猴凹,每個分區(qū)有一個副本夷狰,當(dāng)其中一個broker宕機了,kafka集群還能完整湊出該topic的兩個分 區(qū)郊霎,例如當(dāng)broker0宕機了沼头,可以通過broker1組合出topic的兩個分區(qū)。

創(chuàng)建主題:

kafka-topics.sh --zookeeper hhb:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1

查看主題細節(jié):

kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_re_02

修改副本因子:錯誤

kafka-topics.sh --zookeeper hhb:2181/myKafka --alter --topic tp_re_02 --replication-factor 2

使用kafka-reassign-partitions.sh修改副本因子

### 創(chuàng)建increment-replication-factor.json
vim increment-replication-factor.json
{
  "version":1,
  "partitions":[
      {"topic":"tp_re_02","partition":0,"replicas":[0,1]},
      {"topic":"tp_re_02","partition":1,"replicas":[0,1]},
      {"topic":"tp_re_02","partition":2,"replicas":[1,0]}
  ] 
}

執(zhí)行分配

kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file increase-replication- factor.json --execute

再次查看主題

kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_re_02

分區(qū)分配策略

分區(qū)分配策略.png

在Kafka中书劝,每個Topic會包含多個分區(qū)进倍,默認情況下一個分區(qū)只能被一個消費組下面的一個消費者 消費,這里就產(chǎn)生了分區(qū)分配的問題购对。Kafka中提供了多重分區(qū)分配算法(PartitionAssignor)的實現(xiàn):RangeAssignor猾昆、RoundRobinAssignor、StickyAssignor骡苞。

RangeAssignor

PartitionAssignor接口用于用戶定義實現(xiàn)分區(qū)分配算法垂蜗,以實現(xiàn)Consumer之間的分區(qū)分配。

消費組的成員訂閱它們感興趣的Topic并將這種訂閱關(guān)系傳遞給作為訂閱組協(xié)調(diào)者的Broker解幽。協(xié)調(diào) 者選擇其中的一個消費者來執(zhí)行這個消費組的分區(qū)分配并將分配結(jié)果轉(zhuǎn)發(fā)給消費組內(nèi)所有的消費者贴见。 Kafka默認采用RangeAssignor的分配算法。

RangeAssignor對每個Topic進行獨立的分區(qū)分配躲株。對于每一個Topic片部,首先對分區(qū)按照分區(qū)ID進行 數(shù)值排序,然后訂閱這個Topic的消費組的消費者再進行字典排序霜定,之后盡量均衡的將分區(qū)分配給消費 者档悠。這里只能是盡量均衡,因為分區(qū)數(shù)可能無法被消費者數(shù)量整除望浩,那么有一些消費者就會多分配到一 些分區(qū)站粟。


RangeAssignor1.png

大致算法如下:

assign(topic, consumers) {
  // 對分區(qū)和Consumer進行排序
  List<Partition> partitions = topic.getPartitions();
  sort(partitions);
  sort(consumers);
  // 計算每個Consumer分配的分區(qū)數(shù)
  int numPartitionsPerConsumer = partition.size() / consumers.size(); 
  // 額外有一些Consumer會多分配到分區(qū)
  int consumersWithExtraPartition = partition.size() % consumers.size();
  // 計算分配結(jié)果
  for (int i = 0, n = consumers.size(); i < n; i++) {
      // 第i個Consumer分配到的分區(qū)的index
      int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
      // 第i個Consumer分配到的分區(qū)數(shù)
      int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
      // 分裝分配結(jié)果
      assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
  }
}

RangeAssignor策略的原理是按照消費者總數(shù)和分區(qū)總數(shù)進行整除運算來獲得一個跨度,然后將分區(qū)按照跨度進行平均分配曾雕,以保證分區(qū)盡可能均勻地分配給所有的消費者奴烙。對于每一個Topic, RangeAssignor策略會將消費組內(nèi)所有訂閱這個Topic的消費者按照名稱的字典序排序剖张,然后為每個消 費者劃分固定的分區(qū)范圍切诀,如果不夠平均分配,那么字典序靠前的消費者會被多分配一個分區(qū)搔弄。

這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數(shù)量的增加幅虑,不均衡的問題會越來越嚴 重,比如上圖中4個分區(qū)3個消費者的場景顾犹,C0會多分配一個分區(qū)倒庵。如果此時再訂閱一個分區(qū)數(shù)為4的 Topic褒墨,那么C0又會比C1、C2多分配一個分區(qū)擎宝,這樣C0總共就比C1郁妈、C2多分配兩個分區(qū)了瓜饥,而且隨著 Topic的增加读规,這個情況會越來越嚴重堕油。

字典序靠前的消費組中的消費者比較“貪婪”昂灵。


RangeAssignor2.png
RoundRobinAssignor

RoundRobinAssignor的分配策略是將消費組內(nèi)訂閱的所有Topic的分區(qū)及所有消費者進行排序后盡 量均衡的分配(RangeAssignor是針對單個Topic的分區(qū)進行排序分配的)。如果消費組內(nèi)蝶锋,消費者訂 閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic)易稠,那么分配結(jié)果是盡量均衡的(消費者之 間分配到的分區(qū)數(shù)的差值不會超過1)脐帝。如果訂閱的Topic列表是不同的筋搏,那么分配結(jié)果是不保證“盡量 均衡”的仆百,因為某些消費者不參與一些Topic的分配。


RoundRobinAssignor1.png

相對于RangeAssignor奔脐,在訂閱多個Topic的情況下俄周,RoundRobinAssignor的方式能消費者之間盡 量均衡的分配到分區(qū)(分配到的分區(qū)數(shù)的差值不會超過1——RangeAssignor的分配策略可能隨著訂閱 的Topic越來越多,差值越來越大)帖族。

對于消費組內(nèi)消費者訂閱Topic不一致的情況:假設(shè)有兩個個消費者分別為C0和C1栈源,有2個Topic T1挡爵、T2竖般,分別擁有3和2個分區(qū),并且C0訂閱T1和T2茶鹃,C1訂閱T2涣雕,那么RoundRobinAssignor的分配結(jié)果如下:


RoundRobinAssignor2.png

看上去分配已經(jīng)盡量的保證均衡了,不過可以發(fā)現(xiàn)C0承擔(dān)了4個分區(qū)的消費而C1訂閱了T2一個分區(qū)闭翩,是不是把T2的P0交給C1消費能更加的均衡呢?

StickyAssignor

動機

盡管RoundRobinAssignor已經(jīng)在RangeAssignor上做了一些優(yōu)化來更均衡的分配分區(qū)挣郭,但是在一些情況下依舊會產(chǎn)生嚴重的分配偏差,比如消費組中訂閱的Topic列表不相同的情況下疗韵。更核心的問題是無論是RangeAssignor兑障,還是RoundRobinAssignor,當(dāng)前的分區(qū)分配算法都沒有考慮上一次的分配結(jié)果蕉汪。顯然流译,在執(zhí)行一次新的分配之前,如果能考慮到上一次分配的結(jié)果者疤,盡量少的調(diào)整分區(qū)分配的變動福澡,顯然是能節(jié)省很多開銷的。

目標

從字面意義上看驹马,Sticky是“粘性的”革砸,可以理解為分配結(jié)果是帶“粘性的”:

  1. 分區(qū)的分配盡量的均衡

  2. 每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致

當(dāng)這兩個目標發(fā)生沖突時除秀,優(yōu)先保證第一個目標。第一個目標是每個分配算法都盡量嘗試去完成的算利,而第二個目標才真正體現(xiàn)出StickyAssignor特性的册踩。 我們先來看預(yù)期分配的結(jié)構(gòu),后續(xù)再具體分析StickyAssignor的算法實現(xiàn)笔时。 例如:

  • 有3個Consumer:C0棍好、C1、C2
  • 有4個Topic:T0允耿、T1借笙、T2、T3较锡,每個Topic有2個分區(qū)
  • 所有Consumer都訂閱了這4個分區(qū)

StickyAssignor的分配結(jié)果如下圖所示(增加RoundRobinAssignor分配作為對比):


StickyAssignor1.png

如果消費者1宕機业稼,則按照RoundRobin的方式分配結(jié)果如下:

打亂從新來過,輪詢分配:


StickyAssignor2.png

按照Sticky的方式:

僅對消費者1分配的分區(qū)進行重分配蚂蕴,紅線部分低散。最終達到均衡的目的。


StickyAssignor3.png

再舉一個例子:

  • 有3個Consumer:C0骡楼、C1熔号、C2
  • 3個Topic:T0、T1鸟整、T2引镊,它們分別有1、2篮条、3個分區(qū)
  • C0訂閱T0;C1訂閱T0弟头、T1;C2訂閱T0、T1涉茧、T2

分配結(jié)果如下圖所示:


StickyAssignor4.png

消費者0下線赴恨,則按照輪詢的方式分配:


StickyAssignor5.png

按照Sticky方式分配分區(qū),僅僅需要動的就是紅線部分伴栓,其他部分不動伦连。


StickyAssignor6.png

StickyAssignor分配方式的實現(xiàn)稍微復(fù)雜點兒,我們可以先理解圖示部分即可钳垮。感興趣的同學(xué)可以 研究一下惑淳。

自定義分配策略

自定義的分配策略必須要實現(xiàn)org.apache.kafka.clients.consumer.internals.PartitionAssignor接 口。PartitionAssignor接口的定義如下:

public interface PartitionAssignor {
    PartitionAssignor.Subscription subscription(Set<String> var1);
    Map<String, PartitionAssignor.Assignment> assign(Cluster var1, Map<String, PartitionAssignor.Subscription> var2);
    void onAssignment(PartitionAssignor.Assignment var1);
    String name();
    public static class Assignment {
        private final List<TopicPartition> partitions;
        private final ByteBuffer userData;

    }
    public static class Subscription {
        private final List<String> topics;
        private final ByteBuffer userData;
    }
}

PartitionAssignor接口中定義了兩個內(nèi)部類:Subscription和Assignment扔枫。

Subscription類用來表示消費者的訂閱信息汛聚,類中有兩個屬性:topics和userData,分別表示消費者所訂閱topic列表和用戶自定義信息短荐。PartitionAssignor接口通過subscription()方法來設(shè)置消費者自身相關(guān)的Subscription信息倚舀,注意到此方法中只有一個參數(shù)topics叹哭,與Subscription類中的topics的相互呼應(yīng),但是并沒有有關(guān)userData的參數(shù)體現(xiàn)痕貌。為了增強用戶對分配結(jié)果的控制风罩,可以在subscription() 方法內(nèi)部添加一些影響分配的用戶自定義信息賦予userData,比如:權(quán)重舵稠、ip地址超升、host或者機架 (rack)等等。

再來說一下Assignment類哺徊,它是用來表示分配結(jié)果信息的室琢,類中也有兩個屬性:partitions和 userData,分別表示所分配到的分區(qū)集合和用戶自定義的數(shù)據(jù)落追∮危可以通過PartitionAssignor接口中的 onAssignment()方法是在每個消費者收到消費組leader分配結(jié)果時的回調(diào)函數(shù),例如在StickyAssignor 策略中就是通過這個方法保存當(dāng)前的分配方案轿钠,以備在下次消費組再平衡(rebalance)時可以提供分 配參考依據(jù)巢钓。

接口中的name()方法用來提供分配策略的名稱,對于Kafka提供的3種分配策略而言疗垛, RangeAssignor對應(yīng)的protocol_name為“range”症汹,RoundRobinAssignor對應(yīng)的protocol_name為 “roundrobin”,StickyAssignor對應(yīng)的protocol_name為“sticky”贷腕,所以自定義的分配策略中要注意命名 的時候不要與已存在的分配策略發(fā)生沖突背镇。這個命名用來標識分配策略的名稱,在后面所描述的加入消 費組以及選舉消費組leader的時候會有涉及花履。

真正的分區(qū)分配方案的實現(xiàn)是在assign()方法中芽世,方法中的參數(shù)metadata表示集群的元數(shù)據(jù)信息挚赊, 而subscriptions表示消費組內(nèi)各個消費者成員的訂閱信息诡壁,最終方法返回各個消費者的分配信息。

Kafka中還提供了一個抽象類 org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor荠割,它可以簡化 PartitionAssignor接口的實現(xiàn)妹卿,對assign()方法進行了實現(xiàn),其中會將Subscription中的userData信息 去掉后蔑鹦,在進行分配夺克。Kafka提供的3種分配策略都是繼承自這個抽象類。如果開發(fā)人員在自定義分區(qū)分 配策略時需要使用userData信息來控制分區(qū)分配的結(jié)果嚎朽,那么就不能直接繼承 AbstractPartitionAssignor這個抽象類铺纽,而需要直接實現(xiàn)PartitionAssignor接口。

package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; 
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyAssignor extends AbstractPartitionAssignor {
}

在使用時哟忍,消費者客戶端需要添加相應(yīng)的Properties參數(shù)狡门,示例如下:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,MyAssignor.class.getName());
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末陷寝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子其馏,更是在濱河造成了極大的恐慌凤跑,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叛复,死亡現(xiàn)場離奇詭異仔引,居然都是意外死亡,警方通過查閱死者的電腦和手機褐奥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門咖耘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人撬码,你說我怎么就攤上這事鲤看。” “怎么了耍群?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵义桂,是天一觀的道長。 經(jīng)常有香客問我蹈垢,道長慷吊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任曹抬,我火速辦了婚禮溉瓶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘谤民。我一直安慰自己堰酿,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布张足。 她就那樣靜靜地躺著触创,像睡著了一般。 火紅的嫁衣襯著肌膚如雪为牍。 梳的紋絲不亂的頭發(fā)上哼绑,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機與錄音碉咆,去河邊找鬼抖韩。 笑死,一個胖子當(dāng)著我的面吹牛疫铜,可吹牛的內(nèi)容都是我干的茂浮。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼席揽!你這毒婦竟也來了佃乘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤驹尼,失蹤者是張志新(化名)和其女友劉穎趣避,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體新翎,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡程帕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了地啰。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片愁拭。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖亏吝,靈堂內(nèi)的尸體忽然破棺而出岭埠,到底是詐尸還是另有隱情,我是刑警寧澤蔚鸥,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布惜论,位于F島的核電站,受9級特大地震影響止喷,放射性物質(zhì)發(fā)生泄漏馆类。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一弹谁、第九天 我趴在偏房一處隱蔽的房頂上張望乾巧。 院中可真熱鬧,春花似錦预愤、人聲如沸沟于。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽旷太。三九已至,卻和暖如春向图,著一層夾襖步出監(jiān)牢的瞬間泳秀,已是汗流浹背标沪。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工榄攀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人金句。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓檩赢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子贞瞒,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容