Kafka 原理以及分區(qū)分配策略剖析

一癣疟、簡介

Apache Kafka 是一個分布式的流處理平臺(分布式的基于發(fā)布/訂閱模式的消息隊列【Message Queue】)吠卷。

流處理平臺有以下3個特性:

  • 可以讓你發(fā)布和訂閱流式的記錄鳄哭。這一方面與消息隊列或者企業(yè)消息系統(tǒng)類似腐螟。

  • 可以儲存流式的記錄窑业,并且有較好的容錯性钦幔。

  • 可以在流式記錄產(chǎn)生時就進行處理。

1.1 消息隊列的兩種模式

1.1.1 點對點模式

生產(chǎn)者將消息發(fā)送到queue中常柄,然后消費者從queue中取出并且消費消息鲤氢。消息被消費以后搀擂,queue中不再存儲,所以消費者不可能消費到已經(jīng)被消費的消息卷玉。Queue支持存在多個消費者哨颂,但是對一個消息而言,只能被一個消費者消費相种。

image

1.1.2 發(fā)布/訂閱模式

生產(chǎn)者將消息發(fā)布到topic中威恼,同時可以有多個消費者訂閱該消息。和點對點方式不同寝并,發(fā)布到topic的消息會被所有訂閱者消費箫措。

[圖片上傳失敗...(image-80c774-1610941529403)]

1.2 Kafka 適合什么樣的場景

它可以用于兩大類別的應用:

  • 構(gòu)造實時流數(shù)據(jù)管道,它可以在系統(tǒng)或應用之間可靠地獲取數(shù)據(jù)衬潦。(相當于message queue)斤蔓。

  • 構(gòu)建實時流式應用程序,對這些流數(shù)據(jù)進行轉(zhuǎn)換或者影響镀岛。(就是流處理弦牡,通過kafka stream topic和topic之間內(nèi)部進行變化)。

為了理解Kafka是如何做到以上所說的功能漂羊,從下面開始驾锰,我們將深入探索Kafka的特性。

首先是一些概念:

  • Kafka作為一個集群走越,運行在一臺或者多臺服務器上椭豫。

  • Kafka 通過 topic 對存儲的流數(shù)據(jù)進行分類。

  • 每條記錄中包含一個key买喧,一個value和一個timestamp(時間戳)捻悯。

1.3 主題和分區(qū)

Kafka的消息通過主題(Topic)進行分類,就好比是數(shù)據(jù)庫的表淤毛,或者是文件系統(tǒng)里的文件夾。主題可以被分為若干個分區(qū)(Partition)算柳,一個分區(qū)就是一個提交日志低淡。消息以追加的方式寫入分區(qū),然后以先進先出的順序讀取瞬项。注意蔗蹋,由于一個主題一般包含幾個分區(qū),因此無法在整個主題范圍內(nèi)保證消息的順序囱淋,但可以保證消息在單個分區(qū)內(nèi)的順序猪杭。主題是邏輯上的概念,在物理上妥衣,一個主題是橫跨多個服務器的皂吮。

image

Kafka 集群保留所有發(fā)布的記錄(無論他們是否已被消費)戒傻,并通過一個可配置的參數(shù)——保留期限來控制(可以同時配置時間和消息大小,以較小的那個為準)蜂筹。舉個例子需纳, 如果保留策略設置為2天,一條記錄發(fā)布后兩天內(nèi)艺挪,可以隨時被消費不翩,兩天過后這條記錄會被拋棄并釋放磁盤空間。

有時候我們需要增加分區(qū)的數(shù)量麻裳,比如為了擴展主題的容量口蝠、降低單個分區(qū)的吞吐量或者要在單個消費者組內(nèi)運行更多的消費者(因為一個分區(qū)只能由消費者組里的一個消費者讀取)津坑。從消費者的角度來看妙蔗,基于鍵的主題添加分區(qū)是很困難的,因為分區(qū)數(shù)量改變国瓮,鍵到分區(qū)的映射也會變化灭必,所以對于基于鍵的主題來說,建議在一開始就設置好分區(qū)乃摹,避免以后對其進行調(diào)整禁漓。

(注意:不能減少分區(qū)的數(shù)量,因為如果刪除了分區(qū)孵睬,分區(qū)里面的數(shù)據(jù)也一并刪除了播歼,導致數(shù)據(jù)不一致。如果一定要減少分區(qū)的數(shù)量掰读,只能刪除topic重建)

1.4 生產(chǎn)者和消費者

生產(chǎn)者(發(fā)布者)創(chuàng)建消息秘狞,一般情況下,一個消息會被發(fā)布到一個特定的主題上蹈集。生產(chǎn)者在默認情況下把消息均衡的分布到主題的所有分區(qū)上烁试,而并不關(guān)心特定消息會被寫入哪個分區(qū)。不過拢肆,生產(chǎn)者也可以把消息直接寫到指定的分區(qū)减响。這通常通過消息鍵和分區(qū)器來實現(xiàn),分區(qū)器為鍵生成一個散列值郭怪,并將其映射到指定的分區(qū)上支示。生產(chǎn)者也可以自定義分區(qū)器,根據(jù)不同的業(yè)務規(guī)則將消息映射到分區(qū)鄙才。

消費者(訂閱者)讀取消息颂鸿,消費者可以訂閱一個或者多個主題,并按照消息生成的順序讀取它們攒庵。消費者通過檢查消息的偏移量來區(qū)分已經(jīng)讀取過的消息嘴纺。偏移量是一種元數(shù)據(jù)败晴,它是一個不斷遞增的整數(shù)值,在創(chuàng)建消息時颖医,kafka會把它添加到消息里位衩。在給定的分區(qū)里,每個消息的偏移量都是唯一的熔萧。消費者把每個分區(qū)最后讀取的消息偏移量保存在zookeeper或者kafka上糖驴,如果消費者關(guān)閉或者重啟,它的讀取狀態(tài)不會丟失佛致。

消費者是消費者組的一部分贮缕,也就是說,會有一個或者多個消費共同讀取一個主題俺榆。消費者組保證每個分區(qū)只能被同一個組內(nèi)的一個消費者使用感昼。如果一個消費者失效,群組里的其他消費者可以接管失效消費者的工作罐脊。

image

1.5 broker和集群

broker:一個獨立的kafka服務器被稱為broker定嗓。broker接收來自生產(chǎn)者的消息,為消息設置偏移量萍桌,并提交消息到磁盤保存宵溅。broker為消費者提供服務,對讀取分區(qū)的請求作出相應上炎,返回已經(jīng)提交到磁盤上的消息恃逻。

集群:交給同一個zookeeper集群來管理的broker節(jié)點就組成了kafka的集群。

broker是集群的組成部分藕施,每個集群都有一個broker同時充當集群控制器的角色寇损。控制器負責管理工作裳食,包括將分區(qū)分配給broker和監(jiān)控broker矛市。在broker中,一個分區(qū)從屬于一個broker诲祸,該broker被稱為分區(qū)的首領(lǐng)尘盼。一個分區(qū)可以分配給多個broker(Topic設置了多個副本的時候),這時會發(fā)生分區(qū)復制烦绳。如下圖:

image

broker如何處理請求:broker會在它所監(jiān)聽的每個端口上運行一個Acceptor線程,這個線程會創(chuàng)建一個連接并把它交給Processor線程去處理配紫。Processor線程(也叫網(wǎng)絡線程)的數(shù)量是可配的径密,Processor線程負責從客戶端獲取請求信息,把它們放進請求隊列躺孝,然后從響應隊列獲取響應信息享扔,并發(fā)送給客戶端底桂。如下圖所示:

image

生產(chǎn)請求和獲取請求都必須發(fā)送給分區(qū)的首領(lǐng)副本(分區(qū)Leader)。如果broker收到一個針對特定分區(qū)的請求惧眠,而該分區(qū)的首領(lǐng)在另外一個broker上籽懦,那么發(fā)送請求的客戶端會收到一個“非分區(qū)首領(lǐng)”的錯誤響應。Kafka客戶端要自己負責把生產(chǎn)請求和獲取請求發(fā)送到正確的broker上氛魁。

客戶端如何知道該往哪里發(fā)送請求呢暮顺?客戶端使用了另外一種請求類型——元數(shù)據(jù)請求。這種請求包含了客戶端感興趣的主題列表秀存,服務器的響應消息里指明了這些主題所包含的分區(qū)捶码、每個分區(qū)都有哪些副本,以及哪個副本是首領(lǐng)或链。元數(shù)據(jù)請求可以發(fā)給任意一個broker惫恼,因為所有的broker都緩存了這些信息“难危客戶端緩存這些元數(shù)據(jù)祈纯,并且會定時從broker請求刷新這些信息。此外如果客戶端收到“非首領(lǐng)”錯誤叼耙,它會在嘗試重新發(fā)送請求之前腕窥,先刷新元數(shù)據(jù)。

image

1.6 Kafka 基礎(chǔ)架構(gòu)

image

二旬蟋、Kafka架構(gòu)深入

2.1 Kafka工作流程及文件存儲機制

2.1.1 工作流程

image

Kafka中消息是以topic進行分類的油昂,生產(chǎn)者生產(chǎn)消息,消費者消費消息倾贰,都是面向topic的冕碟。

Topic是邏輯上的概念,而partition(分區(qū))是物理上的概念匆浙,每個partition對應于一個log文件安寺,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。Producer生產(chǎn)的數(shù)據(jù)會被不斷追加到該log文件末端首尼,且每條數(shù)據(jù)都有自己的offset挑庶。消費者組中的每個消費者,都會實時記錄自己消費到哪個offset软能,以便出錯恢復時迎捺,從上次的位置繼續(xù)消費。

2.1.2 文件存儲機制

image

由于生產(chǎn)者生產(chǎn)的消息會不斷追加到log文件末尾查排,為防止log文件過大導致數(shù)據(jù)定位效率低下凳枝,Kafka采取了分片和索引的機制,將每個partition分為多個segment。(由log.segment.bytes決定岖瑰,控制每個segment的大小叛买,也可通過log.segment.ms控制,指定多長時間后日志片段會被關(guān)閉)每個segment對應兩個文件——“.index”文件和“.log”文件蹋订。這些文件位于一個文件夾下率挣,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。例如:bing這個topic有3個分區(qū)露戒,則其對應的文件夾為:bing-0椒功、bing-1和bing-2。

索引文件和日志文件命名規(guī)則:每個 LogSegment 都有一個基準偏移量玫锋,用來表示當前 LogSegment 中第一條消息的 offset蛾茉。偏移量是一個 64位的長整形數(shù),固定是20位數(shù)字撩鹿,長度未達到谦炬,用 0 進行填補。如下圖所示:

image

index和log文件以當前segment的第一條消息的offset命名节沦。index文件記錄的是數(shù)據(jù)文件的offset和對應的物理位置键思,正是有了這個index文件,才能對任一數(shù)據(jù)寫入和查看擁有O(1)的復雜度甫贯,index文件的粒度可以通過參數(shù)log.index.interval.bytes來控制吼鳞,默認是是每過4096字節(jié)記錄一條index。下圖為index文件和log文件的結(jié)構(gòu)示意圖:

image

查找message的流程(比如要查找offset為170417的message):

  1. 首先用二分查找確定它是在哪個Segment文件中叫搁,其中0000000000000000000.index為最開始的文件赔桌,第二個文件為0000000000000170410.index(起始偏移為170410+1 = 170411),而第三個文件為0000000000000239430.index(起始偏移為239430+1 = 239431)渴逻。所以這個offset = 170417就落在第二個文件中疾党。其他后續(xù)文件可以依此類推,以起始偏移量命名并排列這些文件惨奕,然后根據(jù)二分查找法就可以快速定位到具體文件位置雪位。

  2. 用該offset減去索引文件的編號,即170417 - 170410 = 7梨撞,也用二分查找法找到索引文件中等于或者小于7的最大的那個編號雹洗。可以看出我們能夠找到[4卧波,476]這組數(shù)據(jù)时肿,476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。

  3. 打開數(shù)據(jù)文件(0000000000000170410.log)港粱,從位置為476的那個地方開始順序掃描直到找到offset為170417的那條Message嗜侮。

2.1.3 數(shù)據(jù)過期機制

當日志片段大小達到log.segment.bytes指定的上限(默認是1GB)或者日志片段打開時長達到log.segment.ms時,當前日志片段就會被關(guān)閉,一個新的日志片段被打開锈颗。如果一個日志片段被關(guān)閉,就開始等待過期咪惠。當前正在寫入的片段叫做活躍片段击吱,活躍片段永遠不會被刪除,所以如果你要保留數(shù)據(jù)1天遥昧,但是片段包含5天的數(shù)據(jù)覆醇,那么這些數(shù)據(jù)就會被保留5天,因為片段被關(guān)閉之前炭臭,這些數(shù)據(jù)無法被刪除永脓。

2.2 Kafka生產(chǎn)者

2.2.1 分區(qū)策略

  1. 多Partition分布式存儲,利于集群數(shù)據(jù)的均衡鞋仍。

  2. 并發(fā)讀寫常摧,加快讀寫速度。

  3. 加快數(shù)據(jù)恢復的速率:當某臺機器掛了威创,每個Topic僅需恢復一部分的數(shù)據(jù)落午,多機器并發(fā)汽烦。

分區(qū)的原則

  1. 指明partition的情況下沈矿,使用指定的partition;

  2. 沒有指明partition分尸,但是有key的情況下吸申,將key的hash值與topic的partition數(shù)進行取余得到partition值梗劫;

  3. 既沒有指定partition,也沒有key的情況下截碴,第一次調(diào)用時隨機生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增)梳侨,將這個值與topic可用的partition數(shù)取余得到partition值,也就是常說的round-robin算法隐岛。


public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        //key為空時猫妙,獲取一個自增的計數(shù),然后對分區(qū)做取模得到分區(qū)編號
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        // key不為空時聚凹,通過key的hash對分區(qū)取模(疑問:為什么這里不像上面那樣割坠,使用availablePartitions呢?)
        // 根據(jù)《Kafka權(quán)威指南》Page45理解:為了保證相同的鍵妒牙,總是能路由到固定的分區(qū)彼哼,如果使用可用分區(qū),那么因為分區(qū)數(shù)變化湘今,會導致相同的key敢朱,路由到不同分區(qū)
        // 所以如果要使用key來映射分區(qū),最好在創(chuàng)建主題的時候就把分區(qū)規(guī)劃好
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
 
private int nextValue(String topic) {
    //為每個topic維護了一個AtomicInteger對象,每次獲取時+1
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

2.2.2 數(shù)據(jù)可靠性保證

kafka提供了哪些方面的保證

  • kafka可以保證分區(qū)消息的順序拴签。如果使用同一個生產(chǎn)者往同一個分區(qū)寫入消息孝常,而且消息B在消息A之后寫入,那么kafka可以保證消息B的偏移量比消息A的偏移量大蚓哩,而且消費者會先讀取到消息A再讀取消息B构灸。

  • 只有當消息被寫入分區(qū)的所有副本時,它才被認為是“已提交”的岸梨。生產(chǎn)者可以選擇接收不同類型的確認喜颁,比如在消息被完全提交時的確認、在消息被寫入分區(qū)首領(lǐng)時的確認曹阔,或者在消息被發(fā)送到網(wǎng)絡時的確認半开。

  • 只要還有一個副本是活躍的,那么已經(jīng)提交的信息就不會丟失赃份。

  • 消費者只能讀取到已經(jīng)提交的消息寂拆。

復制

Kafka的復制機制和分區(qū)的多副本架構(gòu)是kafka可靠性保證的核心。把消息寫入多個副本可以使kafka在發(fā)生奔潰時仍能保證消息的持久性芥炭。

kafka的topic被分成多個分區(qū)漓库,分區(qū)是基本的數(shù)據(jù)塊。每個分區(qū)可以有多個副本园蝠,其中一個是首領(lǐng)渺蒿。所有事件都是發(fā)給首領(lǐng)副本,或者直接從首領(lǐng)副本讀取事件彪薛。其他副本只需要與首領(lǐng)副本保持同步茂装,并及時復制最新的事件。

Leader維護了一個動態(tài)的in-sync replica set(ISR)善延,意為和leader保持同步的follower集合少态。當ISR中的follower完成數(shù)據(jù)同步后,leader就會發(fā)送ack易遣。如果follower長時間未向leader同步數(shù)據(jù)彼妻,則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數(shù)設定豆茫。Leader不可用時侨歉,將會從ISR中選舉新的leader。滿足以下條件才能被認為是同步的:

  • 與zookeeper之間有一個活躍的會話揩魂,也就是說幽邓,它在過去的6s(可配置)內(nèi)向zookeeper發(fā)送過心跳。

  • 在過去的10s(可配置)內(nèi)從首領(lǐng)那里獲取過最新的數(shù)據(jù)火脉。

影響Kafka消息存儲可靠性的配置

[圖片上傳失敗...(image-5d433c-1610941529403)]

ack應答機制

對于某些不太重要的數(shù)據(jù)牵舵,對數(shù)據(jù)的可靠性要求不是很高柒啤,能夠容忍數(shù)據(jù)的少量丟失,所以沒有必要等ISR中的follower全部接收成功畸颅。所以Kafka提供了三種可靠性級別担巩,用戶可以根據(jù)對可靠性和延遲的要求進行權(quán)衡。acks:

  • ** 0:** producer不等待broker的ack重斑,這一操作提供了一個最低的延遲兵睛,broker一接收到還沒寫入磁盤就已經(jīng)返回,當broker故障時可能丟失數(shù)據(jù)窥浪;

  • ** 1:** producer等待leader的ack,partition的leader落盤成功后返回ack笛丙,如果在follower同步成功之前l(fā)eader故障漾脂,那么將會丟失數(shù)據(jù);

  • ** -1(all):**producer等待broker的ack胚鸯,partition的leader和ISR里的follower全部落盤成功后才返回ack骨稿。但是如果在follower同步完成后,broker發(fā)送ack之前姜钳,leader發(fā)生故障坦冠,那么會造成重復數(shù)據(jù)。(極端情況下也有可能丟數(shù)據(jù):ISR中只有一個Leader時哥桥,相當于1的情況)辙浑。

消費一致性保證

image

(1)follower故障

follower發(fā)生故障后會被臨時踢出ISR,待該follower恢復后拟糕,follower會讀取本地磁盤記錄的上次的HW判呕,并將log文件高于HW的部分截取掉,從HW開始向leader進行同步送滞。

等該follower的LEO大于等于該Partition的HW侠草,即follower追上leader之后,就可以重新加入ISR了犁嗅。

(2)leader故障

leader發(fā)生故障后边涕,會從ISR中選出一個新的leader,之后為了保證多個副本之間的數(shù)據(jù)一致性褂微,其余的follower會先將各自的log文件高于HW的部分截掉功蜓,然后從新的leader同步數(shù)據(jù)。

注意:這只能保證副本之間的數(shù)據(jù)一致性蕊梧,并不能保證數(shù)據(jù)不丟失或者不重復霞赫。

2.2.3 消息發(fā)送流程

Kafka 的producer 發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送過程中肥矢,涉及到了兩個線程——main線程和sender線程端衰,以及一個線程共享變量——RecordAccumulator叠洗。main線程將消息發(fā)送給RecordAccumulator,sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker旅东。

image

為了提高效率灭抑,消息被分批次寫入kafka。批次就是一組消息抵代,這些消息屬于同一個主題和分區(qū)腾节。(如果每一個消息都單獨穿行于網(wǎng)絡,會導致大量的網(wǎng)絡開銷荤牍,把消息分成批次傳輸可以減少網(wǎng)絡開銷案腺。不過要在時間延遲和吞吐量之間做出權(quán)衡:批次越大,單位時間內(nèi)處理的消息就越多康吵,單個消息的傳輸時間就越長)劈榨。批次數(shù)據(jù)會被壓縮,這樣可以提升數(shù)據(jù)的傳輸和存儲能力晦嵌,但要做更多的計算處理同辣。

相關(guān)參數(shù):

  • batch.size:只有數(shù)據(jù)積累到batch.size后,sender才會發(fā)送數(shù)據(jù)惭载。(單位:字節(jié)旱函,注意:不是消息個數(shù))。

  • linger.ms如果數(shù)據(jù)遲遲未達到batch.size描滔,sender等待 linger.ms之后也會發(fā)送數(shù)據(jù)棒妨。(單位:毫秒)。

  • client.id該參數(shù)可以是任意字符串伴挚,服務器會用它來識別消息的來源靶衍,還可用用在日志和配額指標里。

  • max.in.flight.requests.per.connection:該參數(shù)指定了生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息茎芋。它的值越高颅眶,就會占用越多的內(nèi)存,不過也會提升吞吐量田弥。把它設置為1可以保證消息時按發(fā)送的順序?qū)懭敕掌鞯奶涡铮词拱l(fā)生了重試。

2.3 Kafka消費者

2.3.1 消費方式

consumer采用pull(拉)的模式從broker中讀取數(shù)據(jù)偷厦。

push(推)模式很難適應消費速率不同的消費者商叹,因為消息發(fā)送速率是由broker決定的。它的目標是盡可能以最快的速度傳遞消息只泼,但是這樣容易造成consumer來不及處理消息剖笙,典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞。而pull模式可以根據(jù)consumer的消費能力以適當?shù)乃俾氏M消息请唱。

pull模式的不足之處是弥咪,如果kafka沒有數(shù)據(jù)过蹂,消費者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)聚至。針對這一點酷勺,kafka的消費者在消費數(shù)據(jù)時會傳入一個時長參數(shù)timeout,如果當前沒有數(shù)據(jù)可消費扳躬,consumer會等待一段時間后再返回脆诉。

2.3.2 分區(qū)分配策略

一個consumer group中有多個consumer,一個topic有多個partition贷币,所以必然會涉及到partition的分配問題击胜,即確定哪個partition由哪個consumer來消費。Kafka提供了3種消費者分區(qū)分配策略:RangeAssigor役纹、RoundRobinAssignor潜的、StickyAssignor。

PartitionAssignor接口用于用戶定義實現(xiàn)分區(qū)分配算法字管,以實現(xiàn)Consumer之間的分區(qū)分配。消費組的成員訂閱它們感興趣的Topic并將這種訂閱關(guān)系傳遞給作為訂閱組協(xié)調(diào)者的Broker信不。協(xié)調(diào)者選擇其中的一個消費者來執(zhí)行這個消費組的分區(qū)分配并將分配結(jié)果轉(zhuǎn)發(fā)給消費組內(nèi)所有的消費者嘲叔。Kafka默認采用RangeAssignor的分配算法。

2.3.2.1 RangeAssignor

RangeAssignor對每個Topic進行獨立的分區(qū)分配抽活。對于每一個Topic硫戈,首先對分區(qū)按照分區(qū)ID進行排序,然后訂閱這個Topic的消費組的消費者再進行排序下硕,之后盡量均衡的將分區(qū)分配給消費者丁逝。這里只能是盡量均衡,因為分區(qū)數(shù)可能無法被消費者數(shù)量整除梭姓,那么有一些消費者就會多分配到一些分區(qū)霜幼。分配示意圖如下:

[圖片上傳失敗...(image-69cf84-1610941529403)]

分區(qū)分配的算法如下:


@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());
    //for循環(huán)對訂閱的多個topic分別進行處理
    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();
        List<String> consumersForTopic = topicEntry.getValue();
 
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;
        //對消費者進行排序
        Collections.sort(consumersForTopic);
        //計算平均每個消費者分配的分區(qū)數(shù)
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //計算平均分配后多出的分區(qū)數(shù)
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
 
        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            //計算第i個消費者,分配分區(qū)的起始位置
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            //計算第i個消費者誉尖,分配到的分區(qū)數(shù)量
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數(shù)量的增加罪既,不均衡的問題會越來越嚴重,比如上圖中4個分區(qū)3個消費者的場景铡恕,C0會多分配一個分區(qū)琢感。如果此時再訂閱一個分區(qū)數(shù)為4的Topic,那么C0又會比C1探熔、C2多分配一個分區(qū)驹针,這樣C0總共就比C1、C2多分配兩個分區(qū)了诀艰,而且隨著Topic的增加柬甥,這個情況會越來越嚴重饮六。分配結(jié)果:

[圖片上傳失敗...(image-665ea5-1610941529403)]

訂閱2個Topic,每個Topic4個分區(qū)暗甥,共3個Consumer

  • C0:[T0P0喜滨,T0P1,T1P0撤防,T1P1]

  • C1:[T0P2虽风,T1P2]

  • C2:[T0P3,T1P3]

2.3.2.2 RoundRobinAssignor

RoundRobinAssignor的分配策略是將消費組內(nèi)訂閱的所有Topic的分區(qū)及所有消費者進行排序后盡量均衡的分配(RangeAssignor是針對單個Topic的分區(qū)進行排序分配的)寄月。如果消費組內(nèi)辜膝,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那么分配結(jié)果是盡量均衡的(消費者之間分配到的分區(qū)數(shù)的差值不會超過1)漾肮。如果訂閱的Topic列表是不同的厂抖,那么分配結(jié)果是不保證“盡量均衡”的,因為某些消費者不參與一些Topic的分配克懊。

[圖片上傳失敗...(image-5a0d3e-1610941529403)]

以上兩個topic的情況忱辅,相比于之前RangeAssignor的分配策略,可以使分區(qū)分配的更均衡谭溉。不過考慮這種情況墙懂,假設有三個消費者分別為C0、C1扮念、C2损搬,有3個Topic T0、T1柜与、T2巧勤,分別擁有1、2弄匕、3個分區(qū)颅悉,并且C0訂閱T0,C1訂閱T0和T1粘茄,C2訂閱T0签舞、T1、T2柒瓣,那么RoundRobinAssignor的分配結(jié)果如下:

[圖片上傳失敗...(image-8f646f-1610941529403)]

看上去分配已經(jīng)盡量的保證均衡了儒搭,不過可以發(fā)現(xiàn)C2承擔了4個分區(qū)的消費而C1訂閱了T1,是不是把T1P1交給C1消費能更加的均衡呢芙贫?

2.3.2.3 StickyAssignor

StickyAssignor分區(qū)分配算法搂鲫,目的是在執(zhí)行一次新的分配時,能在上一次分配的結(jié)果的基礎(chǔ)上磺平,盡量少的調(diào)整分區(qū)分配的變動魂仍,節(jié)省因分區(qū)分配變化帶來的開銷拐辽。Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動擦酌。其目標有兩點:

  • 分區(qū)的分配盡量的均衡俱诸。

  • 每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致。

當這兩個目標發(fā)生沖突時赊舶,優(yōu)先保證第一個目標睁搭。第一個目標是每個分配算法都盡量嘗試去完成的,而第二個目標才真正體現(xiàn)出StickyAssignor特性的笼平。

StickyAssignor算法比較復雜园骆,下面舉例來說明分配的效果(對比RoundRobinAssignor),前提條件:

  • 有4個Topic:T0寓调、T1锌唾、T2、T3夺英,每個Topic有2個分區(qū)晌涕。

  • 有3個Consumer:C0、C1痛悯、C2渐排,所有Consumer都訂閱了這4個分區(qū)。

[圖片上傳失敗...(image-b5c6d2-1610941529403)]

上面紅色的箭頭代表的是有變動的分區(qū)分配灸蟆,可以看出,StickyAssignor的分配策略亲族,變動較小炒考。

2.3.3 offset的維護

由于Consumer在消費過程中可能會出現(xiàn)斷電宕機等故障,Consumer恢復后霎迫,需要從故障前的位置繼續(xù)消費斋枢,所以Consumer需要實時記錄自己消費到哪個位置,以便故障恢復后繼續(xù)消費知给。Kafka0.9版本之前瓤帚,Consumer默認將offset保存在zookeeper中,從0.9版本開始涩赢,Consumer默認將offset保存在Kafka一個內(nèi)置的名字叫_consumeroffsets的topic中戈次。默認是無法讀取的,可以通過設置consumer.properties中的exclude.internal.topics=false來讀取筒扒。

2.3.4 kafka高效讀寫數(shù)據(jù)(了解)

順序?qū)懘疟P

Kafka 的 producer生產(chǎn)數(shù)據(jù)怯邪,要寫入到log文件中,寫的過程是一直追加到文件末端花墩,為順序?qū)懶?shù)據(jù)表明澄步,同樣的磁盤,順序?qū)懩艿?00M/s和泌,而隨機寫只有100K/s村缸。這與磁盤的機械結(jié)構(gòu)有關(guān),順序?qū)懼钥煳涿ィ且驗槠涫∪チ舜罅看蓬^尋址的時間梯皿。

零拷貝技術(shù)

零拷貝主要的任務就是避免CPU將數(shù)據(jù)從一塊存儲拷貝到另外一塊存儲,主要就是利用各種零拷貝技術(shù)聋丝,避免讓CPU做大量的數(shù)據(jù)拷貝任務索烹,減少不必要的拷貝,或者讓別的組件來做這一類簡單的數(shù)據(jù)傳輸任務弱睦,讓CPU解脫出來專注于別的任務百姓。這樣就可以讓系統(tǒng)資源的利用更加有效。

參考文獻

  1. Kafka中文文檔

  2. [Kafka系列]之指定了一個offset,怎么查找到對應的消息况木?

  3. 尚硅谷 Kafka 教程( Kafka 框架快速入門)

  4. Kafka分區(qū)分配策略分析——重點:StickyAssignor

  5. Kafka 日志存儲

  6. 淺析Linux中的零拷貝技術(shù)

  7. 《Kafka權(quán)威指南》

作者:Li Xiaobing垒拢,來自vivo互聯(lián)網(wǎng)技術(shù)團隊

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市火惊,隨后出現(xiàn)的幾起案子求类,更是在濱河造成了極大的恐慌,老刑警劉巖屹耐,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尸疆,死亡現(xiàn)場離奇詭異,居然都是意外死亡惶岭,警方通過查閱死者的電腦和手機寿弱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來按灶,“玉大人症革,你說我怎么就攤上這事⊙炫裕” “怎么了噪矛?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長铺罢。 經(jīng)常有香客問我艇挨,道長,這世上最難降的妖魔是什么韭赘? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任雷袋,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘楷怒。我一直安慰自己蛋勺,他們只是感情好,可當我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布鸠删。 她就那樣靜靜地躺著抱完,像睡著了一般。 火紅的嫁衣襯著肌膚如雪刃泡。 梳的紋絲不亂的頭發(fā)上巧娱,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天,我揣著相機與錄音烘贴,去河邊找鬼禁添。 笑死,一個胖子當著我的面吹牛桨踪,可吹牛的內(nèi)容都是我干的老翘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼锻离,長吁一口氣:“原來是場噩夢啊……” “哼铺峭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起汽纠,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤卫键,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后虱朵,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體莉炉,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年碴犬,在試婚紗的時候發(fā)現(xiàn)自己被綠了呢袱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡翅敌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出惕蹄,到底是詐尸還是另有隱情蚯涮,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布卖陵,位于F島的核電站遭顶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏泪蔫。R本人自食惡果不足惜棒旗,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧铣揉,春花似錦饶深、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至朽合,卻和暖如春俱两,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背曹步。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工宪彩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人讲婚。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓尿孔,卻偏偏與公主長得像,于是被迫代替她去往敵國和親磺樱。 傳聞我的和親對象是個殘疾皇子纳猫,可洞房花燭夜當晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容