這可能是最詳細(xì)的Kafka應(yīng)用了

Kafka

kafka是什么?kafka僅僅是屬于消息 中間件嗎集惋?

kafka在設(shè)計(jì)之初的時(shí)候 開(kāi)發(fā)人員們?cè)诔讼⒅虚g件以外孕似,還想吧kafka設(shè)計(jì)為一個(gè)能夠存儲(chǔ)數(shù)據(jù)的系統(tǒng),有點(diǎn)像常見(jiàn)的非關(guān)系型數(shù)據(jù)庫(kù)刮刑,比如說(shuō)NoSql等喉祭。除此之外 還希望kafka能支持持續(xù)變化养渴,不斷增長(zhǎng)的數(shù)據(jù)流, 可以發(fā)布 和訂閱數(shù)據(jù)流,還可以對(duì)于這些數(shù)據(jù)進(jìn)行保存

也就是說(shuō)kafka的本質(zhì) 是一個(gè)數(shù)據(jù)存儲(chǔ)平臺(tái)泛烙,流平臺(tái) 理卑, 只是他在做消息發(fā)布,消息消費(fèi)的時(shí)候我們可以把他當(dāng)做消息中間件來(lái)用蔽氨。

而且kafka在設(shè)計(jì)之初就是采用分布式架構(gòu)設(shè)計(jì)的藐唠, 基于集群的方式工作,且可以自由伸縮鹉究,所以 kafka構(gòu)建集群非常簡(jiǎn)單

基本概念:

  • Broker : 和AMQP里協(xié)議的概念一樣宇立, 就是消息中間件所在的服務(wù)器
  • Topic(主題) : 每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic自赔。(物理上不同Topic的消息分開(kāi)存儲(chǔ)妈嘹,邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
  • Partition(分區(qū)) : Partition是物理上的概念,體現(xiàn)在磁盤上面绍妨,每個(gè)Topic包含一個(gè)或多個(gè)Partition.
  • Producer : 負(fù)責(zé)發(fā)布消息到Kafka broker
  • Consumer : 消息消費(fèi)者润脸,向Kafka broker讀取消息的客戶端。
  • Consumer Group(消費(fèi)者群組) : 每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name痘绎,若不指定group name則屬于默認(rèn)的group)津函。
  • offset 偏移量: 是kafka用來(lái)確定消息是否被消費(fèi)過(guò)的標(biāo)識(shí),在kafka內(nèi)部體現(xiàn)就是一個(gè)遞增的數(shù)字

kafka消息發(fā)送的時(shí)候 ,考慮到性能 可以采用打包方式發(fā)送孤页, 也就是說(shuō) 傳統(tǒng)的消息是一條一條發(fā)送尔苦, 現(xiàn)在可以先把需要發(fā)送的消息緩存在客戶端, 等到達(dá)一定數(shù)值時(shí)行施, 再一起打包發(fā)送允坚, 而且還可以對(duì)發(fā)送的數(shù)據(jù)進(jìn)行壓縮處理,減少在數(shù)據(jù)傳輸時(shí)的開(kāi)銷

kafka優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

  • ? 基于磁盤的數(shù)據(jù)存儲(chǔ) ?

  • 高伸縮性 ?

  • 高性能

  • 應(yīng)用場(chǎng)景 :
    收集指標(biāo)和日志 ?
    提交日志
    流處理

缺點(diǎn):

  • ? 運(yùn)維難度大 ?
  • 偶爾有數(shù)據(jù)混亂的情況 ?
  • 對(duì)zookeeper強(qiáng)依賴 ?
  • 多副本模式下對(duì)帶寬有一定要求

kafka安裝&啟動(dòng)

kafka安裝的話蛾号,直接 從官網(wǎng)下載壓縮包下來(lái)解壓就可以了

注意的是稠项, 啟動(dòng)kafka要先啟動(dòng)zookeeper kafka默認(rèn)自帶了zookeeper 可以啟動(dòng)他自帶的 也可以自己另外使用

啟動(dòng)kafka需要執(zhí)行 kafka-server-start.bat 文件 然后 需要傳入一個(gè)路徑參數(shù) 就是你server.config文件的地址 一般情況下傳入../../config/server.properties 即可

剛剛提到的zookeeper kafka默認(rèn)的zookeeper 啟動(dòng)的話啟動(dòng)zookeeper-server-start.bat文件即可 同樣 要傳入路徑參數(shù):../../config/zookeeper.properties

server 參數(shù)解釋:

log.dirs: 日志文件存儲(chǔ)地址, 可以設(shè)置多個(gè)

num.recovery.threads.per.data.dir:用來(lái)讀取日志文件的線程數(shù)量鲜结,對(duì)應(yīng)每一個(gè)log.dirs 若此參數(shù)為2 log.dirs 為2個(gè)目錄 那么就會(huì)有4個(gè)線程來(lái)讀取

auto.create.topics.enable:是否自動(dòng)創(chuàng)建tiopic

num.partitions: 創(chuàng)建topic的時(shí)候自動(dòng)創(chuàng)建多少個(gè)分區(qū) (可以在創(chuàng)建topic的時(shí)候手動(dòng)指定)

log.retention.hours: 日志文件保留時(shí)間 超時(shí)即刪除

log.retention.bytes: 日志文件最大大小

log.segment.bytes: 當(dāng)日志文件達(dá)到一定大小時(shí)展运,開(kāi)辟新的文件來(lái)存儲(chǔ)(分片存儲(chǔ))

log.segment.ms: 同上 只是當(dāng)達(dá)到一定時(shí)間時(shí) 開(kāi)辟新的文件

message.max.bytes: broker能接收的最大消息大小(單條) 默認(rèn)1M

kafka基本管理操作命令:

列出所有主題

kafka-topics.bat --zookeeper localhost:2181/kafka --list

列出所有主題的詳細(xì)信息

kafka-topics.bat --zookeeper localhost:2181/kafka --describe

創(chuàng)建主題 主題名 my-topic,1副本精刷,8分區(qū)

kafka-topics.bat --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 8 --topic my-topic

增加分區(qū)拗胜,注意:分區(qū)無(wú)法被刪除

kafka-topics.bat --zookeeper localhost:2181/kafka --alter --topic my-topic --partitions 16

刪除主題

kafka-topics.bat --zookeeper localhost:2181/kafka --delete --topic my-topic

列出消費(fèi)者群組(僅Linux)

kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --list

列出消費(fèi)者群組詳細(xì)信息(僅Linux)

kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --describe --group 群組名

kafka java客戶端實(shí)戰(zhàn)

引入maven依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

注意 我這里已經(jīng)創(chuàng)建了一個(gè)叫 test-topic 的主題 如果你們沒(méi)創(chuàng)建先創(chuàng)建后再執(zhí)行代碼

生產(chǎn)者:

public class TestProducter {

    public static void main(String[] args) throws  Exception{
        Properties properties = new Properties();
        //指定kafka服務(wù)器地址 如果是集群可以指定多個(gè)  但是就算只指定一個(gè)他也會(huì)去集群環(huán)境下尋找其他的節(jié)點(diǎn)地 址
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        //key序列化器
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        //value序列化器
        properties.setProperty("value.serializer",StringSerializer.class.getName());
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("test-topic",1,"testKey","hello");
        Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

}

消費(fèi)者:

public class TestCousmer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer",StringDeserializer.class.getName());
        properties.setProperty("group.id","1111");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(500);
            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                System.out.println(stringStringConsumerRecord);
            }
        }
    }
}

kafka生產(chǎn)者參數(shù)詳解

acks:

? 至少要多少個(gè)分區(qū)副本接收到了消息返回確認(rèn)消息 一般是 0:只要消息發(fā)送出去了就確認(rèn)(不管是否失敗) 1:只要 有一個(gè)broker接收到了消息 就返回 all: 所有集群副本都接收到了消息確認(rèn) 當(dāng)然 2 3 4 5 這種數(shù)字都可以, 就是具體多少臺(tái)機(jī)器接收到了消息返回怒允, 但是一般這種情況很少用到

buffer.memory:

? 生產(chǎn)者緩存在本地的消息大小 : 如果生產(chǎn)者在生產(chǎn)消息的速度過(guò)快 快過(guò)了往 broker發(fā)送消息的速度 那么就會(huì)出現(xiàn)buffer.memory不足的問(wèn)題 默認(rèn)值為32M 注意 單位是byte 大概3355000左右

max.block.ms:

? 生產(chǎn)者獲取kafka元數(shù)據(jù)(集群數(shù)據(jù)埂软,服務(wù)器數(shù)據(jù)等) 等待時(shí)間 : 當(dāng)因網(wǎng)絡(luò)原因?qū)е驴蛻舳伺c服務(wù)器通訊時(shí)等待的時(shí)間超過(guò)此值時(shí) 會(huì)拋出一個(gè)TimeOutExctption 默認(rèn)值為 60000ms

retries:

? 設(shè)置生產(chǎn)者生產(chǎn)消息失敗后重試的次數(shù) 默認(rèn)值 3次

retry.backoff.ms:

? 設(shè)置生產(chǎn)者每次重試的間隔 默認(rèn) 100ms

batch.size:

? 生產(chǎn)者批次發(fā)送消息的大小 默認(rèn)16k 注意單位還是byte

linger.ms:

? 生產(chǎn)者生產(chǎn)消息后等待多少毫秒發(fā)送到broker 與batch.size 誰(shuí)先到達(dá)就根據(jù)誰(shuí) 默認(rèn)值為0

compression.type:

? kafka在壓縮數(shù)據(jù)時(shí)使用的壓縮算法 可選參數(shù)有:none、gzip纫事、snappy none即不壓縮 gzip,和snappy壓縮算法之間取舍的話 gzip壓縮率比較高 系統(tǒng)cpu占用比較大 但是帶來(lái)的好處是 網(wǎng)絡(luò)帶寬占用少勘畔, snappy壓縮比沒(méi)有g(shù)zip高 cpu占用率不是很高 性能也還行所灸, 如果網(wǎng)絡(luò)帶寬比較緊張的話 可以選擇gzip 一般推薦snappy

client.id:

? 一個(gè)標(biāo)識(shí), 可以用來(lái)標(biāo)識(shí)消息來(lái)自哪炫七, 不影響kafka消息生產(chǎn)

max.in.flight.requests.per.connection:

? 指定kafka一次發(fā)送請(qǐng)求在得到服務(wù)器回應(yīng)之前,可發(fā)送的消息數(shù)量

偏移量與偏移量提交

? 偏移量是kafka特別重要的一個(gè)概念特別是在消費(fèi)者端爬立, 我們之前也有簡(jiǎn)單提到過(guò)偏移量是拿來(lái)干嘛的.

偏移量是一個(gè)自增長(zhǎng)的ID 用來(lái)標(biāo)識(shí)當(dāng)前分區(qū)的哪些消息被消費(fèi)過(guò)了, 這個(gè)ID會(huì)保存在kafka的broker當(dāng)中 而且 消費(fèi)者本地也會(huì)存儲(chǔ)一份 因?yàn)槊看蜗M(fèi)每一條消息都要更新一下偏移量的話 難免會(huì)影響整個(gè)broker的吞吐量 所以一般 這個(gè)偏移量在每次發(fā)生改動(dòng)時(shí) 先由消費(fèi)者本地改動(dòng)诉字, 默認(rèn)情況下 消費(fèi)者每五秒鐘會(huì)提交一次改動(dòng)的偏移量懦尝, 這樣做雖然說(shuō)吞吐量上來(lái)了知纷, 但是可能會(huì)出現(xiàn)重復(fù)消費(fèi)的問(wèn)題: 因?yàn)榭赡茉谙乱淮翁峤黄屏恐? 消費(fèi)者本地消費(fèi)了一些消息壤圃,然后發(fā)生了分區(qū)再均衡(分區(qū)再均衡在下面有講) 那么就會(huì)出現(xiàn)一個(gè)問(wèn)題 假設(shè)上次提交的偏移量是 2000 在下一次提交之前 其實(shí)消費(fèi)者又消費(fèi)了500條數(shù)據(jù) 也就是說(shuō)當(dāng)前的偏移量應(yīng)該是2500 但是這個(gè)2500只在消費(fèi)者本地, 也就是說(shuō) 假設(shè)其他消費(fèi)者去消費(fèi)這個(gè)分區(qū)的時(shí)候拿到的偏移量是2000 那么又會(huì)從2000開(kāi)始消費(fèi)消息 那么 2000到2500之間的消息又會(huì)被消費(fèi)一遍,這就是重復(fù)消費(fèi)的問(wèn)題.

kafka對(duì)于這種問(wèn)題也提供了解決方案:手動(dòng)提交

你可以關(guān)閉默認(rèn)的自動(dòng)提交(enable.auto.commit= false) 然后使用kafka提供的API來(lái)進(jìn)行偏移量提交:
卡夫卡提供了兩種方式提交你的偏移量 :同步和異步

//同步提交偏移量
kafkaConsumer.commitSync();
//異步提交偏移量
kafkaConsumer.commitAsync();

他們之間的區(qū)別在于 同步提交偏移量會(huì)等待服務(wù)器應(yīng)答 并且遇到錯(cuò)誤會(huì)嘗試重試琅轧,但是會(huì)一定程度上影響性能不過(guò)能確保偏移量到底提交成功與否

而異步提交的對(duì)于性能肯定是有提示的 但是弊端也就像我們剛剛所提到 遇到錯(cuò)誤沒(méi)辦法重試 因?yàn)榭赡茉谑盏侥氵@個(gè)結(jié)果的時(shí)候又提交過(guò)偏移量了 如果這時(shí)候重試 又會(huì)導(dǎo)致消息重復(fù)的問(wèn)題了..

其實(shí) 我們可以采用同步+異步的方式來(lái)保證提交的正確性以及服務(wù)器的性能

因?yàn)? 異步提交的話 如果出現(xiàn)問(wèn)題但是不是致命問(wèn)題的話 可能下一次提交就不會(huì)出現(xiàn)這個(gè)問(wèn)題了伍绳, 所以 有些異常是不需要解決的(可能單純的就是網(wǎng)絡(luò)抽風(fēng)了呢? ) 所以 我們平時(shí)可以采用異步提交的方式 等到消費(fèi)者中斷了(遇到了致命問(wèn)題,或是強(qiáng)制中斷消費(fèi)者) 的時(shí)候再使用同步提交(因?yàn)檫@次如果失敗了就沒(méi)有下次了... 所以要讓他重試) 乍桂。

具體代碼:

try {
    while (true) {
        ConsumerRecords<String, String> poll = kafkaConsumer.poll(500);
        for (ConsumerRecord<String, String> context : poll) {
            System.out.println("消息所在分區(qū):" + context.partition() + "-消息的偏移量:" + context.offset()
                    + "key:" + context.key() + "value:" + context.value());
        }
        //正常情況異步提交
         kafkaConsumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        //當(dāng)程序中斷時(shí)同步提交
        kafkaConsumer.commitSync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //關(guān)閉當(dāng)前消費(fèi)者  具體在下面有講
        kafkaConsumer.close();
    }

}

值得一提的是 在手動(dòng)提交時(shí)kafka提供了你可以傳入具體的偏移量來(lái)完成提交 也就是指定偏移量提交,但是非常不建議手動(dòng)指定 因?yàn)槿绻付ǖ钠屏?小于 分區(qū)所存儲(chǔ)的偏移量大小的話 那么會(huì)導(dǎo)致消息重復(fù)消費(fèi)冲杀, 如果指定的偏移量大于分區(qū)所存儲(chǔ)的偏移量的話,那么會(huì)導(dǎo)致消息丟失.

代碼:

Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
//我這里就指定了test-topic這個(gè)主題下的分區(qū)1  OffsetAndMetadata:第一個(gè)參數(shù)為你要提交的偏移量 第二個(gè)參數(shù)可以選擇性的傳入業(yè)務(wù)ID 可以拿來(lái)確定這次提交  這里我直接提交偏移量為0 那么會(huì)導(dǎo)致下個(gè)消費(fèi)者或者說(shuō)分區(qū)再均衡之后再來(lái)讀取這個(gè)分區(qū)的數(shù)據(jù)會(huì)從第一條開(kāi)始讀取
offset.put(new TopicPartition("test-topic", 1), new OffsetAndMetadata(0, "1"));
//指定偏移量提交 參數(shù)為map集合  key為指定的主題下的分區(qū)睹酌,value 為你要提交的偏移量
kafkaConsumer.commitSync(offset);

Rebalance 分區(qū)再均衡

這也是kafka里面非常重要的一個(gè)概念

首先 Rebalance 是一個(gè)操作 在以下情況下會(huì)觸發(fā)Rebalance 操作:

  1. 組成員發(fā)生變更(新consumer加入組权谁、已有consumer主動(dòng)離開(kāi)組或已有consumer崩潰了)
  2. 訂閱主題數(shù)發(fā)生變更,如果你使用了正則表達(dá)式的方式進(jìn)行訂閱憋沿,那么新建匹配正則表達(dá)式的topic就會(huì)觸發(fā)rebalance
  3. 訂閱主題的分區(qū)數(shù)發(fā)生變更

當(dāng)觸發(fā)Rebalance kafka重新分配分區(qū)所有權(quán)

何為分區(qū)所有權(quán)旺芽? 我們之前有提到過(guò), 消費(fèi)者有一個(gè)消費(fèi)者組的概念辐啄, 而且一個(gè)消費(fèi)者組在消費(fèi)一個(gè)主題時(shí)有以下規(guī)則
一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū) 但是一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi) 如果 我有分區(qū) 0 1 2 現(xiàn)在有消費(fèi)者 A采章,B 那么 kafka可能會(huì)讓消費(fèi)者A 消費(fèi) 0,1 這2個(gè)分區(qū) 那么 這時(shí)候 我們就會(huì)說(shuō) 消費(fèi)者A 擁有 分區(qū) 0,1的所有權(quán)壶辜。

當(dāng)觸發(fā) Rebalance 的時(shí)候 kafka會(huì)重新分配這個(gè)所有權(quán) 還是基于剛剛的比方 消費(fèi)者A 擁有 0 和1 的所有權(quán) 消費(fèi)者B 會(huì)有2的所有權(quán) 當(dāng)消費(fèi)者B離開(kāi)kafka的時(shí)候 這時(shí)候 kafka會(huì)重新分配一下所有權(quán) 此時(shí) 整個(gè)消費(fèi)者組只有一個(gè)A 那么 0 1 2 三個(gè)分區(qū)的所有權(quán)都會(huì)屬于A 同理 如果這時(shí)候有消費(fèi)者C進(jìn)入這個(gè)消費(fèi)者組 那么 這時(shí)候kafka會(huì)確保每一個(gè)消費(fèi)者都能消費(fèi)一個(gè)分區(qū).

當(dāng)觸發(fā)Rebalance時(shí) 由于kafka正在分配所有權(quán) 會(huì)導(dǎo)致消費(fèi)者不能消費(fèi)悯舟, 而且 還會(huì)引發(fā)一個(gè)重復(fù)消費(fèi)的問(wèn)題, 當(dāng)

消費(fèi)者還沒(méi)來(lái)得及提交偏移量時(shí) 分區(qū)所有權(quán)遭到了重新分配 那么這時(shí)候就會(huì)導(dǎo)致一個(gè)消息被多個(gè)消費(fèi)者重復(fù)消費(fèi)

那么 解決方案就是 在消費(fèi)者訂閱時(shí)砸民, 添加一個(gè)再均衡監(jiān)聽(tīng)器抵怎, 也就是當(dāng)kafka在做Rebalance 操作前后 均會(huì)調(diào)用再均衡監(jiān)聽(tīng)器 那么這時(shí)候 我們可以在kafka Rebalance之前提交我們消費(fèi)者最后處理的消息來(lái)解決這個(gè)問(wèn)題。

Close():

當(dāng)我們不需要某個(gè)消費(fèi)者繼續(xù)消費(fèi)kafka當(dāng)中的數(shù)據(jù)時(shí)岭参, 我們可以選擇調(diào)用Close方法來(lái)關(guān)閉它反惕,在關(guān)閉之前 close方法會(huì)發(fā)送一個(gè)通知告訴kafka我這個(gè)消費(fèi)者要退出了, 那么 kafka就會(huì)準(zhǔn)備Rebalance 而且如果是采用的自動(dòng)提交偏移量 消費(fèi)者自身也會(huì)在關(guān)閉自己之前提交最后所消費(fèi)的偏移量 冗荸。

當(dāng)然 即使沒(méi)有調(diào)用close方法 而是直接強(qiáng)制中斷了消費(fèi)者的進(jìn)程 kafka也會(huì)根據(jù)我們后面會(huì)說(shuō)到的系統(tǒng)參數(shù)捕捉到消費(fèi)者退出了承璃。

?

獨(dú)立消費(fèi)者:

kafka支持這樣的需求: 可能你的消費(fèi)者不想訂閱某個(gè)主題 也不想加入什么消費(fèi)組 只想訂閱某個(gè)(多個(gè))主題下的某個(gè)(多個(gè))分區(qū)。

那么可以采用分配的方式蚌本, 而不是訂閱 盔粹, 我們之前講的都是基于消費(fèi)組訂閱某個(gè)主題來(lái)完成消息的消費(fèi)隘梨, 那么你訂閱的主題有哪些分區(qū)的消息是屬于你的 這個(gè)是kafka來(lái)分配的 而不是你自己決定的 那么我們可以換為自己分配的方式來(lái)完成消息的消費(fèi):

List<TopicPartition> list = new ArrayList<>();
//new出一個(gè)分區(qū)對(duì)象 聲明這個(gè)分區(qū)是哪個(gè)topic下面的哪個(gè)分區(qū)
list.add(new TopicPartition("test-topic",0));
//分配這個(gè)消費(fèi)者所需要消費(fèi)的分區(qū), 傳入一個(gè)分區(qū)對(duì)象集合
kafkaConsumer.assign(list);

消費(fèi)者參數(shù):

fetch.min.bytes:

? 該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí)舷嗡,如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小轴猎,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載进萄,因?yàn)樗鼈冊(cè)谥黝}不是很活躍的時(shí)候(或者一天里的低谷時(shí)段)就不需要來(lái)來(lái)回回地處理消息捻脖。如果沒(méi)有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率卻很高中鼠,那么就需要把該屬性的值設(shè)得比默認(rèn)值大可婶。如果消費(fèi)者的數(shù)量比較多,把該屬性的值設(shè)置得大一點(diǎn)可以降低 broker 的工作負(fù)載援雇。
默認(rèn)值為1 byte

fetch.max.wait.ms

? 我們通過(guò) fetch.min.bytes 告訴 Kafka矛渴,等到有足夠的數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。而 feth.max.wait.ms 則用于指定 broker 的等待時(shí)間惫搏,默認(rèn)是如果沒(méi)有足夠的數(shù)據(jù)流入Kafka具温,消費(fèi)者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致 500ms 的延遲筐赔。如果 fetch.max.wait.ms 被設(shè)為 100ms铣猩,并且 fetch.min.bytes 被設(shè)為 1MB,那么 Kafka 在收到消費(fèi)者的請(qǐng)求后茴丰,要么返回 1MB 數(shù)據(jù)达皿,要么在 100ms 后返回所有可用的數(shù)據(jù),就看哪個(gè)條件先得到滿足较沪。
默認(rèn)值為500ms

max.partition.fetch.bytes

? 該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)鳞绕。默認(rèn)值是 1MB

session.timeout.ms 和heartbeat.interval.ms

session.timeout.ms :

? 消費(fèi)者多久沒(méi)有發(fā)送心跳給服務(wù)器服務(wù)器則認(rèn)為消費(fèi)者死亡/退出消費(fèi)者組 默認(rèn)值:10000ms

heartbeat.interval.ms :

? 消費(fèi)者往kafka服務(wù)器發(fā)送心跳的間隔 一般設(shè)置為session.timeout.ms的三分之一 默認(rèn)值:3000ms

auto.offset.reset:

? 當(dāng)消費(fèi)者本地沒(méi)有對(duì)應(yīng)分區(qū)的offset時(shí) 會(huì)根據(jù)此參數(shù)做不同的處理 默認(rèn)值為:latest

earliest

? 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi)尸曼;無(wú)提交的offset時(shí)们何,從頭開(kāi)始消費(fèi)

latest

? 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi)控轿;無(wú)提交的offset時(shí)冤竹,消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)

none

? topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi)茬射;只要有一個(gè)分區(qū)不存在已提交的offset鹦蠕,則拋出異常

enable.auto.commit

? 該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)值是 true在抛。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失钟病,可以把它設(shè)為 false,由自己控制何時(shí)提交偏移量。如果把它設(shè)為 true肠阱,還可以通過(guò)配置 auto.commit.interval.ms 屬性來(lái)控制提交的頻率票唆。

partition.assignment.strategy

? PartitionAssignor 根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者屹徘。Kafka 有兩個(gè)默認(rèn)的分配策略走趋。

  • ? Range:該策略會(huì)把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。假設(shè)消費(fèi)者 C1 和消費(fèi)者 C2 同時(shí)訂閱了主題 T1 和主題 T2噪伊,并且每個(gè)主題有 3 個(gè)分區(qū)簿煌。那么消費(fèi)者 C1 有可能分配到這兩個(gè)主題的分區(qū) 0 和分區(qū) 1,而消費(fèi)者 C2 分配到這兩個(gè)主題的分區(qū)2鉴吹。因?yàn)槊總€(gè)主題擁有奇數(shù)個(gè)分區(qū)姨伟,而分配是在主題內(nèi)獨(dú)立完成的,第一個(gè)消費(fèi)者最后分配到比第二個(gè)消費(fèi)者更多的分區(qū)拙寡。只要使用了 Range 策略授滓,而且分區(qū)數(shù)量無(wú)法被消費(fèi)者數(shù)量整除琳水,就會(huì)出現(xiàn)這種情況肆糕。
  • ? RoundRobin:該策略把主題的所有分區(qū)逐個(gè)分配給消費(fèi)者。如果使用 RoundRobin 策略來(lái)給消費(fèi)者 C1 和消費(fèi)者 C2 分配分區(qū)在孝,那么消費(fèi)者 C1 將分到主題 T1 的分區(qū) 0 和分區(qū) 2 以及主題 T2 的分區(qū) 1诚啃,消費(fèi)者 C2 將分配到主題 T1 的分區(qū) 1 以及主題 T2 的分區(qū) 0 和分區(qū) 2。一般來(lái)說(shuō)私沮,如果所有消費(fèi)者都訂閱相同的主題(這種情況很常見(jiàn))始赎,RoundRobin 策略會(huì)給所有消費(fèi)者分配相同數(shù)量的分區(qū)(或最多就差一個(gè)分區(qū))。

max.poll.records

單次調(diào)用 poll() 方法最多能夠返回的記錄條數(shù) ,默認(rèn)值 500

receive.buffer.bytes 和 send.buffer.bytes

? receive.buffer.bytes 默認(rèn)值 64k 單位 bytes

? send.buffer.bytes 默認(rèn)值 128k 單位 bytes

? 這兩個(gè)參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小仔燕。如果它們被設(shè)為 -1

使用java來(lái)操作kafka管理命令

? 首先 得引入一個(gè)依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.1</version>
</dependency>

? 我們之前所引入的依賴是kafka客戶端的依賴 這個(gè)是另外的依賴 不是一回事

創(chuàng)建topic

?

public static void createTopic(){
    ZkUtils zkUtils = ZkUtils.apply("localhost:2181/kafka", 30000, 30000, JaasUtils.isZkSecurityEnabled());
    System.out.println(JaasUtils.isZkSecurityEnabled());
    AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), AdminUtils.createTopic$default$6());
    zkUtils.close();
}

刪除topic

public static  void deleteTopic(){
    ZkUtils zkUtils = ZkUtils.apply("localhost:2181/kafka", 30000, 30000, JaasUtils.isZkSecurityEnabled());
    AdminUtils.deleteTopic(zkUtils, "t1");
    zkUtils.close();
}

列出所有topic

public  static void listTopic(){
    ZkUtils zkUtils = ZkUtils.apply("localhost:2181/kafka", 30000, 30000, JaasUtils.isZkSecurityEnabled());
    List<String> list = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
    for (String s : list) {
        System.out.println(s);
    }
    zkUtils.close();
}

?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末造垛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子晰搀,更是在濱河造成了極大的恐慌五辽,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件外恕,死亡現(xiàn)場(chǎng)離奇詭異杆逗,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)鳞疲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門罪郊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人尚洽,你說(shuō)我怎么就攤上這事悔橄。” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵癣疟,是天一觀的道長(zhǎng)尺铣。 經(jīng)常有香客問(wèn)我,道長(zhǎng)争舞,這世上最難降的妖魔是什么凛忿? 我笑而不...
    開(kāi)封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮竞川,結(jié)果婚禮上店溢,老公的妹妹穿的比我還像新娘。我一直安慰自己委乌,他們只是感情好床牧,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著遭贸,像睡著了一般戈咳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上壕吹,一...
    開(kāi)封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天著蛙,我揣著相機(jī)與錄音,去河邊找鬼耳贬。 笑死踏堡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的咒劲。 我是一名探鬼主播顷蟆,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼腐魂!你這毒婦竟也來(lái)了帐偎?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蛔屹,失蹤者是張志新(化名)和其女友劉穎削樊,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體判导,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嫉父,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了眼刃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绕辖。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖擂红,靈堂內(nèi)的尸體忽然破棺而出仪际,到底是詐尸還是另有隱情围小,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布树碱,位于F島的核電站肯适,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏成榜。R本人自食惡果不足惜框舔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望赎婚。 院中可真熱鬧刘绣,春花似錦、人聲如沸挣输。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)撩嚼。三九已至停士,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間完丽,已是汗流浹背恋技。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舰涌,地道東北人猖任。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像瓷耙,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子刁赖,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容