Kafka 入門

1 Kafka概述

1.1 定義

Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列杰妓,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域狈定。

應(yīng)用場(chǎng)景:

解耦

異步

削峰

1.2 消息隊(duì)列

1.2.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景

image
image

1.2.2 消息隊(duì)列的兩種模式

點(diǎn)對(duì)點(diǎn)模式:

消息生產(chǎn)者生產(chǎn)消息發(fā)送到Queue中,然后消息消費(fèi)者從Queue中取出并且消費(fèi)消息,消息被消費(fèi)以后税弃,Queue中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息讹语,Queue支持存在多個(gè)消費(fèi)者钙皮,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)顽决。

image

發(fā)布訂閱模式:

消息生產(chǎn)者將消息發(fā)布到Topic短条,同時(shí)有多個(gè)消息消費(fèi)者該消息,和點(diǎn)對(duì)點(diǎn)不同的是才菠,發(fā)布到Topic中的消息會(huì)被所有訂閱者消費(fèi)茸时。

image

1.3 基礎(chǔ)架構(gòu)

image

Producer:消息生產(chǎn)者,就是向Kafka Broker發(fā)消息的客戶端

Consumer:消息消費(fèi)者赋访,向Kafka Broker取消息的客戶端

Consumer Group (CG):消費(fèi)者組可都,由多個(gè)Consumer組成,消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)蚓耽,一個(gè)分區(qū)只能由一個(gè)消費(fèi)者消費(fèi)渠牲,消費(fèi)者組之間互不影響,所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組步悠,即消費(fèi)者組是邏輯上的一個(gè)訂閱者

Broker:一臺(tái)Kafka服務(wù)器就是一個(gè)Broker签杈,一個(gè)集群由多個(gè)Broker組成,一個(gè)Broker可以容納多個(gè)Topic

Topic:可以理解為一個(gè)隊(duì)列鼎兽,生產(chǎn)者和消費(fèi)者面向的都是一個(gè)Topic

Partition:為了實(shí)現(xiàn)擴(kuò)展性答姥,一個(gè)非常大的Topic可以分布到多個(gè)Broker(即服務(wù)器)上,一個(gè)Topic可以分為多個(gè)Partition谚咬,每個(gè)Partition是一個(gè)有序的隊(duì)列

Replica:副本鹦付,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的Partition數(shù)據(jù)不丟失择卦,且Kafka仍然能夠繼續(xù)工作敲长,Kafka提供了副本機(jī)制,一個(gè)Topic的每個(gè)分區(qū)都有若干個(gè)副本秉继,一個(gè)leader和若干個(gè)follower

leader:每個(gè)分區(qū)多個(gè)副本的主潘明,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是leader

follower:每個(gè)分區(qū)多個(gè)副本中的從秕噪,實(shí)時(shí)從leader中同步數(shù)據(jù),保持和leader數(shù)據(jù)的同步厚宰,leader發(fā)生故障時(shí)腌巾,某個(gè)follower會(huì)成為新的follower

2 Kafka快速入門

2.1 安裝部署

1遂填、解壓

[djm@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2、修改解壓后的文件夾名稱

[djm@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

3澈蝙、在/opt/module/kafka目錄下創(chuàng)建logs文件夾

[djm@hadoop102 kafka]$ mkdir logs

4吓坚、修改配置文件

[djm@hadoop102 kafka]$ vi config/server.properties

修改以下內(nèi)容

#broker的全局唯一編號(hào),不能重復(fù)
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運(yùn)行日志存放的路徑 
log.dirs=/opt/module/kafka/logs
#topic在當(dāng)前broker上的分區(qū)個(gè)數(shù)
num.partitions=1
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間灯荧,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

5礁击、分發(fā)

[djm@hadoop102 kafka]$ xsync kafka

6、修改其他Brokerbroker.id

7逗载、Kafka群起腳本

[djm@hadoop102 kafka]$ vim start-kafka
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
    echo "========== $i =========="
    ssh $i 'source /etc/profile&&/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
    echo $?
done
[djm@hadoop102 kafka]$ chmod 777 start-kafka
[djm@hadoop102 kafka]$ sudo mv start-kafka /bin

8哆窿、啟動(dòng)Kafka集群

[djm@hadoop102 kafka]$ start-kafka

2.2 命令行操作

1、查看所有Topic

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2厉斟、創(chuàng)建Topic

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
#--topic 定義topic名
#--replication-factor 定義副本數(shù)
#--partitions 定義分區(qū)數(shù)

--topic 定義topic名
--replication-factor 定義副本數(shù)
--partitions 定義分區(qū)數(shù)

3挚躯、刪除Topic

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

4、發(fā)送消息

[djm@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

5擦秽、消費(fèi)消息

[djm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

--from-beginning 會(huì)把topic中以往所有的消息消費(fèi)出來(lái)

6码荔、查看Topic詳細(xì)信息

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

7、修改分區(qū)數(shù)

[djm@hadoop102 kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6

分區(qū)數(shù)只能增加感挥,不能減少

3 Kafka架構(gòu)

3.1 Kafka工作流程及文件存儲(chǔ)機(jī)制

image

Kafka中消息是以Topic進(jìn)行分類的缩搅,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息触幼,都是面向Topic的硼瓣;

Topic是邏輯上的概念,而Partition是物理上的概念域蜗,每個(gè)Partition對(duì)應(yīng)于一個(gè)log文件巨双,該log文件中存儲(chǔ)的就是Producer生產(chǎn)的數(shù)據(jù);

Producer生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log文件末端霉祸,且每條數(shù)據(jù)都有自己的offset筑累,消費(fèi)者組中的每個(gè)消費(fèi)者,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset丝蹭,以便出錯(cuò)恢復(fù)時(shí)慢宗,從上次的位置繼續(xù)消費(fèi)。

image

由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷的追加到log文件末尾奔穿,為了防止文件過大而導(dǎo)致數(shù)據(jù)定位效率低下镜沽,Kafka采取了分片和索引機(jī)制,將每個(gè)Partiton分為多個(gè)segment贱田,每個(gè)segment對(duì)應(yīng)兩個(gè)文件缅茉,分別是.log.index,這些文件位于同一個(gè)文件夾下男摧,文件夾的命名規(guī)則為Topic名稱+Partiton序號(hào)蔬墩,.log.index文件以當(dāng)前segment的第一條消息的offset命名译打,index存儲(chǔ)索引信息,.log存儲(chǔ)數(shù)據(jù)信息拇颅,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址奏司。

image

3.2 Producer

3.2.1 分區(qū)策略

為什么要進(jìn)行分區(qū)?

  • 方便在群集中擴(kuò)展樟插,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器韵洋,而一個(gè)Topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了
  • 可以提高并發(fā)

分區(qū)的原則是什么黄锤?

我們需要將Producer發(fā)送的數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象:

  • 指明Partition的情況下搪缨,直接將指明的值直接作為Partition
  • 沒有指明Partition值但有key的情況下,將keyhash值與TopicPartition數(shù)進(jìn)行取余得到 Partition
  • 既沒有Partition值又沒有key值的情況下猜扮,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)勉吻,將這個(gè)值與Topic可用的Partition總數(shù)取余得到Partition值,也就是常說(shuō)的round-robin算法

3.2.2 數(shù)據(jù)可靠性保證

為保證Partition發(fā)送的數(shù)據(jù)旅赢,能可靠的發(fā)送到指定的Topic齿桃,Topic的每個(gè)Partition收到Producer發(fā)送的數(shù)據(jù)后,都需要向Producer發(fā)送ackacknowledgement確認(rèn)收到)煮盼,如果Producer收到ack短纵,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)僵控。

image

副本數(shù)據(jù)同步策略:

方案 優(yōu)點(diǎn) 缺點(diǎn)
半數(shù)以上完成同步香到,就發(fā)送ack 延遲低 選舉新的leader時(shí),容忍n臺(tái)節(jié)點(diǎn)的故障报破,需要2n+1個(gè)副本
全部完成同步悠就,才發(fā)送ack 選舉新的leader時(shí),容忍n臺(tái)節(jié)點(diǎn)的故障充易,需要n+1個(gè)副本 延遲高

Kafka選擇了第二種方案梗脾,原因如下:

同樣為了容忍n臺(tái)節(jié)點(diǎn)的故障,第一種方案需要2n+1個(gè)副本盹靴,而第二種方案只需要n+1個(gè)副本炸茧,而Kafka的每個(gè)Partition存儲(chǔ)大量的數(shù)據(jù),這樣會(huì)造成大量的數(shù)據(jù)冗余;

雖然第二種方案的延遲會(huì)比較高,但是相比而言延遲對(duì)Kafka的影響較小空执。

采用第二種方案后,leader收到數(shù)據(jù)控漠,所有的follower都開始同步數(shù)據(jù),但是有一個(gè)follower悬钳,因?yàn)槟撤N故障润脸,遲遲不能與leader同步柬脸,那leader就要一直等下去,直到它同步完才能發(fā)送ack毙驯,這個(gè)問題怎么解決呢?

leader維護(hù)了一個(gè)動(dòng)態(tài)的in-syncreplica set (ISR),意為和leader保持同步的follower集合灾测,當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后爆价,leader就會(huì)給follower發(fā)送ack,如果follower長(zhǎng)時(shí)間未向leader同步數(shù)據(jù)媳搪,則該follower將被踢出ISR铭段,該時(shí)間閾值由replica.lag.time.max.ms參數(shù)設(shè)定,leader發(fā)生故障之后秦爆,就會(huì)從ISR中選舉新的leader序愚。

ack應(yīng)答機(jī)制:

對(duì)于某些不重要的數(shù)據(jù),能夠容忍少量數(shù)據(jù)的丟失等限,所以沒必要等ISR中的所有follower全部同步完成

所以Kafka提供了三種可靠性級(jí)別爸吮,根據(jù)對(duì)可靠性和延遲的要求權(quán)衡,分別是:

  • 0 Producer不等待Brokerack望门,這一操作提供了最低的延遲形娇,Broker一接收到還沒有落盤就已經(jīng)返回,當(dāng)Broker故障時(shí)可能會(huì)丟失數(shù)據(jù)
  • 1 Producer等待Brokerack筹误,Partitionleader落盤成功后返回ack桐早,如果在follower同步成功之前leader故障,那么將會(huì)丟失數(shù)據(jù)
  • -1 Producer等待Brokerack厨剪,Partitionleaderfollower全部落盤成功后才返回ack哄酝,但是如果在follower同步完成后,Broker發(fā)送ack之前祷膳,leader發(fā)生故障陶衅,那么會(huì)造成數(shù)據(jù)重復(fù)

故障處理:

image

follower掛了被會(huì)暫時(shí)提出ISR,等到follower恢復(fù)后钾唬,follower會(huì)讀取本地磁盤記錄上次的HW万哪,并將log文件中高于HW的部分截取掉,從HW開始向leader進(jìn)行同步抡秆,等leaderLEO高于PartitionHW奕巍,就可以被重新加入ISR

leader發(fā)生故障之后,會(huì)從ISR中選出一個(gè)新的leader儒士,為保證多個(gè)副本之間的數(shù)據(jù)一致性的止,每個(gè)leader會(huì)將各自log文件中高于HW的數(shù)據(jù)切掉,然后從新的leader同步數(shù)據(jù)

3.3.3 Exactly Once語(yǔ)義

對(duì)于某些比較重要的消息着撩,我們需要保證Exactly Once語(yǔ)義诅福,即保證每條消息被發(fā)送且僅被發(fā)送一次

0.11版本之后匾委,Kafka引入了冪等性機(jī)制(idempotent),配合acks = -1時(shí)的at least once語(yǔ)義氓润,實(shí)現(xiàn)了ProducerBrokerExactly once語(yǔ)義

idempotent + at least once = exactly once

使用時(shí)赂乐,只需將enable.idempotence屬性設(shè)置為trueKafka自動(dòng)將acks屬性設(shè)為-1

3.3 Consumer

3.3.1 消費(fèi)方式

Consumer采取pull的方式從Broker中讀取數(shù)據(jù)

為什么采用pull方式呢咖气?

因?yàn)?code>push模式很難適應(yīng)不同速率的Consumer挨措,因此發(fā)送速率是由Broker決定的,它的目的就是盡可能快的傳遞消息崩溪,但是這樣容易造成Consumer來(lái)不及處理消息浅役,典型的表現(xiàn)就是網(wǎng)絡(luò)擁堵以及拒絕服務(wù),而poll模式則可以根據(jù)Consumer的消費(fèi)能力消費(fèi)消息伶唯。

但是poll也有不足觉既,就是如果隊(duì)列中沒有消息,Consumer可能陷入循環(huán)中乳幸,一直返回空數(shù)據(jù)瞪讼,針對(duì)這個(gè)缺點(diǎn),Consumer在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)timeout反惕,如果當(dāng)前沒有消息可供消費(fèi)尝艘,Consumer會(huì)等待一段時(shí)間再返回,這段時(shí)間就是timeout姿染。

3.3.2 分區(qū)分配策略

Kafka有兩種分配策略背亥,分別是:

image
image

3.3.3 offset維護(hù)

由于Consumer在消息過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,Consumer恢復(fù)后悬赏,需要從故障的位置繼續(xù)消費(fèi)狡汉,所以Consumer需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset

0.9以前,Consumer默認(rèn)將offset保存在ZK

0.9以后闽颇,Consumer默認(rèn)將offset保存在Kafka一個(gè)內(nèi)置的Topic盾戴,該Topic__consumer_offsets

3.4 Kafka高效讀取數(shù)據(jù)

順序?qū)懘疟P

KafkaProducer生產(chǎn)數(shù)據(jù),要寫入到log文件中兵多,寫的過程是一直追加到文件末端

零拷貝技術(shù)

image

3.5 ZookeeperKafka中的作用

Kafka集群中有一個(gè)broker會(huì)被選舉為Controller尖啡,負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作剩膘。

Controller的管理工作都是依賴于Zookeeper的衅斩。

以下為partition的leader選舉過程:

image
image

4 Kafka API

4.1 Producer API

4.1.1 消息發(fā)送流程

KafkaProducer發(fā)送消息采用的是異步發(fā)送的方式,在消息發(fā)送的過程中怠褐,涉及到了兩個(gè)線程——main線程和Sender線程畏梆,以及一個(gè)線程共享變量——RecordAccumulatormain線程將消息發(fā)送給RecordAccumulatorSender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker奠涌。

image

相關(guān)參數(shù):

batch.size:只有數(shù)據(jù)積累到batch.size之后宪巨,sender才會(huì)發(fā)送數(shù)據(jù)

linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.sizesender等待linger.time之后就會(huì)發(fā)送數(shù)據(jù)

相關(guān)類:

KafkaProducer:需要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者對(duì)象溜畅,用來(lái)發(fā)送數(shù)據(jù)

ProducerConfig:獲取所需的一系列配置參數(shù)

ProducerRecord:每條數(shù)據(jù)都要封裝成一個(gè)ProducerRecord對(duì)象

導(dǎo)入依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

4.1.2 異步發(fā)送

package com.djm.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success -> " + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

4.1.3 同步發(fā)送

package com.djm.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success -> " + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            }).get();
        }
        producer.close();
    }
}

4.2 Consumer API

Consumer消費(fèi)數(shù)據(jù)時(shí)的可靠性是很容易保證的捏卓,因?yàn)閿?shù)據(jù)在Kafka中是持久化的,故不用擔(dān)心數(shù)據(jù)丟失問題慈格。

由于Consumer在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障天吓,Consumer恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi)峦椰,所以Consumer需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset,以便故障恢復(fù)后繼續(xù)消費(fèi)汰规。

所以offset的維護(hù)是Consumer消費(fèi)數(shù)據(jù)是必須考慮的問題汤功。

相關(guān)類:

KafkaConsumer:需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者對(duì)象,用來(lái)消費(fèi)數(shù)據(jù)

ConsumerConfig:獲取所需的一系列配置參數(shù)

ConsuemrRecord:每條數(shù)據(jù)都要封裝成一個(gè)ConsumerRecord對(duì)象

導(dǎo)入依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

4.2.1 手動(dòng)提交offset

package com.djm.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }
}

手動(dòng)提交offset的方法有兩種:

  • commitSync(同步提交):將本次poll的一批數(shù)據(jù)最高的偏移量提交溜哮,失敗重試滔金,一直到提交成功
  • commitAsync(異步提交):將本次poll的一批數(shù)據(jù)最高的偏移量提交,沒有失敗重試機(jī)制茂嗓,有可能提交失敗

4.2.2 自動(dòng)提交offset

自動(dòng)提交offset的相關(guān)參數(shù):

enable.auto.commit:是否開啟自動(dòng)提交offset功能

auto.commit.interval.ms:自動(dòng)提交offset的時(shí)間間隔

package com.djm.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

4.3 自定義Interceptor

Interceptor是在Kafka 0.10版本被引入的餐茵,主要用于實(shí)現(xiàn)Client端的定制化控制邏輯。

對(duì)于Producer而言述吸,Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求忿族,比如修改消息等,同時(shí)蝌矛,Producer允許用戶指定多個(gè)Interceptor按序作用于同一條消息從而形成一個(gè)Interceptorchain道批。

Interceptor的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  • configure(configs):獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用
  • onSend(ProducerRecord)Producer確保在消息被序列化以及計(jì)算分區(qū)前調(diào)用該方法入撒,用戶可以在該方法中對(duì)消息做任何操作隆豹,但最好保證不要修改消息所屬的TopicPartition,否則會(huì)影響目標(biāo)分區(qū)的計(jì)算
  • onAcknowledgement(RecordMetadata, Exception):該方法會(huì)在消息從RecordAccumulator成功發(fā)送到Kafka Broker之后茅逮,或者在發(fā)送過程中失敗時(shí)調(diào)用
  • close:關(guān)閉Interceptor璃赡,主要用于執(zhí)行一些資源清理工作

攔截器案例

1、需求分析:

實(shí)現(xiàn)一個(gè)簡(jiǎn)單的雙Interceptor組成的攔截鏈献雅,第一個(gè)Interceptor會(huì)在消息發(fā)送前將時(shí)間戳信息加到消息value的最前部碉考,第二個(gè)Interceptor會(huì)在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)

2、編寫TimeInterceptor

package com.djm.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3惩琉、編寫CounterInterceptor

package com.djm.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private static long successCounter = 0L;

    private static long errorCounter = 0L;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4豆励、修改CustomProducer

package com.djm.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.djm.kafka.interceptor.TimeInterceptor");  
        interceptors.add("com.djm.kafka.interceptor.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success -> " + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

5 Flume對(duì)接Kafka

1、配置Flume

編寫flume-kafka.conf

[djm@hadoop102 job]$ vim flume-kafka.conf

輸入一下內(nèi)容

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、啟動(dòng)消費(fèi)者

[djm@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

3良蒸、啟動(dòng)Flume

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4技扼、向/opt/module/datas/flume.log里追加數(shù)據(jù),查看Kafka消費(fèi)情況

6 Kafka監(jiān)控

6.1 Monitor

1嫩痰、上傳jarKafkaOffsetMonitor-assembly-0.4.6.jar到集群

2剿吻、在/opt/module/下創(chuàng)建kafka-offset-console文件夾

3、將上傳的jar包放入剛創(chuàng)建的目錄下

4串纺、在/opt/module/kafka-offset-console目錄下創(chuàng)建啟動(dòng)腳本start.sh丽旅,內(nèi)容如下:

#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &

5、在/opt/module/kafka-offset-console目錄下創(chuàng)建mobile-logs文件夾

6纺棺、啟動(dòng)Monitor

./start.sh

6.2 Manager

1榄笙、上傳壓縮包kafka-manager-1.3.3.15.zip到集群

2、解壓到/opt/module

3祷蝌、修改配置文件conf/application.conf

kafka-manager.zkhosts="kafka-manager-zookeeper:2181"

修改為:

kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"

4茅撞、啟動(dòng)kafka-manager

[djm@hadoop102 kafka-manager-1.3.3.15]$ bin/kafka-manager

5、登錄hadoop102:9000頁(yè)面查看詳細(xì)信息

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末巨朦,一起剝皮案震驚了整個(gè)濱河市米丘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌糊啡,老刑警劉巖拄查,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異棚蓄,居然都是意外死亡堕扶,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門癣疟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)挣柬,“玉大人,你說(shuō)我怎么就攤上這事睛挚⌒盎祝” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵扎狱,是天一觀的道長(zhǎng)侧到。 經(jīng)常有香客問我,道長(zhǎng)淤击,這世上最難降的妖魔是什么匠抗? 我笑而不...
    開封第一講書人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮污抬,結(jié)果婚禮上汞贸,老公的妹妹穿的比我還像新娘绳军。我一直安慰自己,他們只是感情好矢腻,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開白布门驾。 她就那樣靜靜地躺著,像睡著了一般多柑。 火紅的嫁衣襯著肌膚如雪奶是。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,950評(píng)論 1 291
  • 那天竣灌,我揣著相機(jī)與錄音聂沙,去河邊找鬼。 笑死初嘹,一個(gè)胖子當(dāng)著我的面吹牛及汉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播屯烦,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼豁生,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了漫贞?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤育叁,失蹤者是張志新(化名)和其女友劉穎迅脐,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體豪嗽,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谴蔑,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了龟梦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片隐锭。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖计贰,靈堂內(nèi)的尸體忽然破棺而出钦睡,到底是詐尸還是另有隱情,我是刑警寧澤躁倒,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布荞怒,位于F島的核電站,受9級(jí)特大地震影響秧秉,放射性物質(zhì)發(fā)生泄漏褐桌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一象迎、第九天 我趴在偏房一處隱蔽的房頂上張望荧嵌。 院中可真熱鬧,春花似錦、人聲如沸啦撮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)逻族。三九已至蜻底,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間聘鳞,已是汗流浹背薄辅。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留抠璃,地道東北人站楚。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像搏嗡,于是被迫代替她去往敵國(guó)和親窿春。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • 以下內(nèi)容部分翻譯至 http://kafka.apache.org/intro kafka介紹 我們認(rèn)為采盒,一個(gè)流處...
    若與閱讀 8,758評(píng)論 0 22
  • 大致可以通過上述情況進(jìn)行排除 1.kafka服務(wù)器問題 查看日志是否有報(bào)錯(cuò)旧乞,網(wǎng)絡(luò)訪問問題等。 2. kafka p...
    生活的探路者閱讀 7,580評(píng)論 0 10
  • Apache Kafka 入門 1.kafka簡(jiǎn)介和產(chǎn)生的背景 什么是 Kafka Kafka 是一款分布式消息發(fā)...
    阿粒_lxf閱讀 1,786評(píng)論 0 0
  • 一磅氨、KAFKA介紹 Kafka是一種分布式的尺栖,基于發(fā)布/訂閱的消系統(tǒng)。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類...
    大饞愚閱讀 3,212評(píng)論 2 4
  • Kafka系列一- Kafka背景及架構(gòu)介紹 Kafka簡(jiǎn)介 Kafka是一種分布式的烦租,基于發(fā)布/訂閱的消息系統(tǒng)延赌。...
    raincoffee閱讀 2,196評(píng)論 0 22