Kafka 0.9+Zookeeper3.4.6集群搭建香璃、配置这难,新Client API的使用要點(diǎn),高可用性測(cè)試葡秒,以及各種坑

Kafka 0.9版本對(duì)java client的api做出了較大調(diào)整姻乓,本文主要總結(jié)了Kafka 0.9在集群搭建、高可用性眯牧、新API方面的相關(guān)過(guò)程和細(xì)節(jié)蹋岩,以及本人在安裝調(diào)試過(guò)程中踩出的各種坑。

關(guān)于Kafka的結(jié)構(gòu)学少、功能剪个、特點(diǎn)、適用場(chǎng)景等不再贅述了版确,直接進(jìn)入正文

Kafka 0.9集群安裝配置

操作系統(tǒng):CentOS 6.5

安裝Java環(huán)境

Zookeeper和Kafka的運(yùn)行都需要Java環(huán)境扣囊,所以先安裝JRE乎折,Kafka默認(rèn)使用G1垃圾回收器,如果不更改垃圾回收器如暖,官方推薦使用 7u51以上版本的JRE笆檀。如果你使用老版本的JRE,需要更改Kafka的啟動(dòng)腳本盒至,指定G1以外的垃圾回收器酗洒。

Java環(huán)境的安裝過(guò)程在此不贅述了。

Zookeeper集群搭建

Kafka依賴Zookeeper管理自身集群(Broker枷遂、Offset樱衷、Producer、Consumer等)酒唉,所以先要安裝 Zookeeper矩桂。自然,為了達(dá)到高可用的目的痪伦,Zookeeper自身也不能是單點(diǎn)侄榴,接下來(lái)就介紹如何搭建一個(gè)最小的Zookeeper集群(3個(gè) zk節(jié)點(diǎn))

此處選用Zookeeper的版本是3.4.6,此為Kafka0.9中推薦的Zookeeper版本网沾。

解壓

tar -xzvf zookeeper-3.4.6.tar.gz

進(jìn)入zookeeper的conf目錄癞蚕,將zoo_sample.cfg復(fù)制一份,命名為zoo.cfg辉哥,此即為Zookeeper的配置文件

cp zoo_sample.cfg zoo.cfg

編輯zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/data/zk/zk0/data
dataLogDir=/data/zk/zk0/logs
# the port at which the clients will connect
clientPort=2181
server.0=10.0.0.100:4001:4002
server.1=10.0.0.101:4001:4002
server.2=10.0.0.102:4001:4002

dataDir和dataLogDir的路徑需要在啟動(dòng)前創(chuàng)建好
clientPort為zookeeper的服務(wù)端口
server.0/1/2為zk集群中三個(gè)node的信息桦山,定義格式為hostname:port1:port2,其中port1是node間通信使用的端口醋旦,port2是node選舉使用的端口恒水,需確保三臺(tái)主機(jī)的這兩個(gè)端口都是互通的

在另外兩臺(tái)主機(jī)上執(zhí)行同樣的操作,安裝并配置zookeeper

分別在三臺(tái)主機(jī)的dataDir路徑下創(chuàng)建一個(gè)文件名為myid的文件饲齐,文件內(nèi)容為該zk節(jié)點(diǎn)的編號(hào)钉凌。例如在第一臺(tái)主機(jī)上建立的myid文件內(nèi)容是0,第二臺(tái)是1捂人。

接下來(lái)御雕,啟動(dòng)三臺(tái)主機(jī)上的zookeeper服務(wù):

bin/zkServer.sh start

3個(gè)節(jié)點(diǎn)都啟動(dòng)完成后,可依次執(zhí)行如下命令查看集群狀態(tài):

bin/zkServer.sh status

命令輸出如下:

Mode: leader 或 Mode: follower

3個(gè)節(jié)點(diǎn)中先慷,應(yīng)有1個(gè)leader和兩個(gè)follower

驗(yàn)證zookeeper集群高可用性:

假設(shè)目前3個(gè)zk節(jié)點(diǎn)中,server0為leader咨察,server1和server2為follower

我們停掉server0上的zookeeper服務(wù):

bin/zkServer.sh stop

再到server1和server2上查看集群狀態(tài)论熙,會(huì)發(fā)現(xiàn)此時(shí)server1(也有可能是server2)為leader,另一個(gè)為follower摄狱。

再次啟動(dòng)server0的zookeeper服務(wù)脓诡,運(yùn)行zkServer.sh status檢查无午,發(fā)現(xiàn)新啟動(dòng)的server0也為follower

至此,zookeeper集群的安裝和高可用性驗(yàn)證完成祝谚。

附:Zookeeper默認(rèn)會(huì)將控制臺(tái)信息輸出到啟動(dòng)路徑下的zookeeper.out中宪迟,顯然在生產(chǎn)環(huán)境中我們不能允許Zookeeper這樣做,通過(guò)如下方法交惯,可以讓Zookeeper輸出按尺寸切分的日志文件:

修改conf/log4j.properties文件次泽,將

zookeeper.root.logger=INFO, CONSOLE

改為

zookeeper.root.logger=INFO, ROLLINGFILE

修改bin/zkEnv.sh文件,將

ZOO_LOG4J_PROP="INFO,CONSOLE"

改為

ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

然后重啟zookeeper席爽,就ok了

Kafka集群搭建

此例中意荤,我們會(huì)安裝配置一個(gè)有兩個(gè)Broker組成的Kafka集群,并在其上創(chuàng)建一個(gè)兩個(gè)分區(qū)的Topic

本例中使用Kafka-0.9.0.1

解壓

tar -xzvf kafka_2.11-0.9.0.1.tgz

編輯config/server.properties文件只锻,下面列出關(guān)鍵的參數(shù)

#此Broker的ID玖像,集群中每個(gè)Broker的ID不可相同
broker.id=0
#監(jiān)聽器,端口號(hào)與port一致即可
listeners=PLAINTEXT://:9092
#Broker監(jiān)聽的端口
port=9092
#Broker的Hostname齐饮,填主機(jī)IP即可
host.name=10.0.0.100
#向Producer和Consumer建議連接的Hostname和port(此處有坑捐寥,具體見(jiàn)后)
advertised.host.name=10.0.0.100
advertised.port=9092
#進(jìn)行IO的線程數(shù),應(yīng)大于主機(jī)磁盤數(shù)
num.io.threads=8
#消息文件存儲(chǔ)的路徑
log.dirs=/data/kafka-logs
#消息文件清理周期祖驱,即清理x小時(shí)前的消息記錄
log.retention.hours=168
#每個(gè)Topic默認(rèn)的分區(qū)數(shù)握恳,一般在創(chuàng)建Topic時(shí)都會(huì)指定分區(qū)數(shù),所以這個(gè)配成1就行了
num.partitions=1
#Zookeeper連接串羹膳,此處填寫上一節(jié)中安裝的三個(gè)zk節(jié)點(diǎn)的ip和端口即可
zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181

配置項(xiàng)的詳細(xì)說(shuō)明請(qǐng)見(jiàn)官方文檔:http://kafka.apache.org/documentation.html#brokerconfigs

此處的坑:

按照官方文檔的說(shuō)法睡互,advertised.host.name和advertised.port這兩個(gè)參數(shù)用于定義集群向Producer和 Consumer廣播的節(jié)點(diǎn)host和port,如果不定義的話陵像,會(huì)默認(rèn)使用host.name和port的定義就珠。但在實(shí)際應(yīng)用中,我發(fā)現(xiàn)如果不定義 advertised.host.name參數(shù)醒颖,使用Java客戶端從遠(yuǎn)端連接集群時(shí)妻怎,會(huì)發(fā)生連接超時(shí),拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired
經(jīng)過(guò)debug發(fā)現(xiàn)泞歉,連接到集群是成功的逼侦,但連接到集群后更新回來(lái)的集群meta信息卻是錯(cuò)誤的:


20160406221953137.png

能夠看到,metadata中的Cluster信息腰耙,節(jié)點(diǎn)的hostname是iZ25wuzqk91Z這樣的一串?dāng)?shù)字榛丢,而不是實(shí)際的ip地址 10.0.0.100和101。iZ25wuzqk91Z其實(shí)是遠(yuǎn)端主機(jī)的hostname挺庞,這說(shuō)明在沒(méi)有配置advertised.host.name 的情況下晰赞,Kafka并沒(méi)有像官方文檔宣稱的那樣改為廣播我們配置的host.name,而是廣播了主機(jī)配置的hostname。遠(yuǎn)端的客戶端并沒(méi)有配置 hosts掖鱼,所以自然是連接不上這個(gè)hostname的然走。要解決這一問(wèn)題,需要正確配置客戶端主機(jī)的host戏挡,也可以把host.name和advertised.host.name都配置成絕對(duì)的ip地址

接下來(lái)芍瑞,我們?cè)诹硪慌_(tái)主機(jī)也完成Kafka的安裝和配置,然后在兩臺(tái)主機(jī)上分別啟動(dòng)Kafka:

bin/kafka-server-start.sh -daemon config/server.properties 

此處的坑:
官方給出的后臺(tái)啟動(dòng)kafka的方法是:

bin/kafka-server-start.sh config/server.properties & 

但用這種方式啟動(dòng)后褐墅,只要斷開Shell或登出拆檬,Kafka服務(wù)就會(huì)自動(dòng)shutdown,解決辦法是啟動(dòng)時(shí)添加-daemon參數(shù)掌栅。

接下來(lái)秩仆,我們創(chuàng)建一個(gè)名為test,擁有兩個(gè)分區(qū)猾封,兩個(gè)副本的Topic:

bin/kafka-topics.sh --create --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --replication-factor 2 --partitions 2 --topic test

創(chuàng)建完成后,使用如下命令查看Topic狀態(tài):

bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

輸出:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
     Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
     Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

解讀:test這個(gè)topic晌缘,當(dāng)前有2個(gè)分區(qū)磷箕,分別為0和1选酗,分區(qū)0的Leader是1(這個(gè)1是broker.id),分區(qū)0有兩個(gè) Replica(副本)芒填,分別是1和0空繁,這兩個(gè)副本中殿衰,Isr(In-sync)的是0和1。分區(qū)2的Leader是0盛泡,也有兩個(gè)Replica傲诵,同樣也 是兩個(gè)replica都是in-sync狀態(tài)

至此,Kafka 0.9集群的搭建工作就完成了悟衩,接下來(lái)我們將介紹新的Java API的使用栓拜,以及集群高可用性的驗(yàn)證測(cè)試。

使用Kafka的Producer API來(lái)完成消息的推送

  1. Kafka 0.9.0.1的java client依賴:
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
  1. 寫一個(gè)KafkaUtil工具類,用于構(gòu)造Kafka Client
public class KafkaUtil {
    private static KafkaProducer<String, String> kp;

    public static KafkaProducer<String, String> getProducer() {
        if (kp == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
            props.put("acks", "1");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kp = new KafkaProducer<String, String>(props);
        }
        return kp;
    }
}

KafkaProducer<K,V>的K代表每條消息的key類型薛耻,V代表消息類型饼齿。消息的key用于決定此條消息由哪一個(gè)partition接收蝙搔,所以我們需要保證每條消息的key是不同的吃型。

Producer端的常用配置

  • bootstrap.servers:Kafka集群連接串勤晚,可以由多個(gè)host:port組成
  • acks:broker消息確認(rèn)的模式,有三種:
    0:不進(jìn)行消息接收確認(rèn)鸟蜡,即Client端發(fā)送完成后不會(huì)等待Broker的確認(rèn)
    1:由Leader確認(rèn)揉忘,Leader接收到消息后會(huì)立即返回確認(rèn)信息
    all:集群完整確認(rèn),Leader會(huì)等待所有in-sync的follower節(jié)點(diǎn)都確認(rèn)收到消息后端铛,再返回確認(rèn)信息
    我們可以根據(jù)消息的重要程度沦补,設(shè)置不同的確認(rèn)模式。默認(rèn)為1
  • retries:發(fā)送失敗時(shí)Producer端的重試次數(shù)虚倒,默認(rèn)為0
  • batch.size:當(dāng)同時(shí)有大量消息要向同一個(gè)分區(qū)發(fā)送時(shí)魂奥,Producer端會(huì)將消息打包后進(jìn)行批量發(fā)送耻煤。如果設(shè)置為0,則每條消息都獨(dú)立發(fā)送棺妓。默認(rèn)為16384字節(jié)
  • linger.ms:發(fā)送消息前等待的毫秒數(shù)怜跑,與batch.size配合使用性芬。在消息負(fù)載不高的情況下剧防,配置linger.ms能夠讓Producer在發(fā)送消息前等待一定時(shí)間峭拘,以積累更多的消息打包發(fā)送鸡挠,達(dá)到節(jié)省網(wǎng)絡(luò)資源的目的。默認(rèn)為0
  • key.serializer/value.serializer:消息key/value的序列器Class鞋囊,根據(jù)key和value的類型決定
  • buffer.memory:消息緩沖池大小溜腐。尚未被發(fā)送的消息會(huì)保存在Producer的內(nèi)存中挺益,如果消息產(chǎn)生的速度大于消息發(fā)送的速度乘寒,那么緩沖池滿后發(fā)送消息的請(qǐng)求會(huì)被阻塞伞辛。默認(rèn)33554432字節(jié)(32MB)

更多的Producer配置見(jiàn)官網(wǎng):http://kafka.apache.org/documentation.html#producerconfigs

  1. 寫一個(gè)簡(jiǎn)單的Producer端蚤氏,每隔1秒向Kafka集群發(fā)送一條消息:
public class KafkaTest {
    public static void main(String[] args) throws Exception{
        Producer<String, String> producer = KafkaUtil.getProducer();
        int i = 0;
        while(true) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null)
                        e.printStackTrace();
                    System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
            i++;
            Thread.sleep(1000);
        }
    }
}

在調(diào)用KafkaProducer的send方法時(shí)竿滨,可以注冊(cè)一個(gè)回調(diào)方法,在Producer端完成發(fā)送后會(huì)觸發(fā)回調(diào)邏輯垫言,在回調(diào)方法的 metadata對(duì)象中骏掀,我們能夠獲取到已發(fā)送消息的offset和落在的分區(qū)等信息柱告。注意笑陈,如果acks配置為0涵妥,依然會(huì)觸發(fā)回調(diào)邏輯蓬网,只是拿不到 offset和消息落地的分區(qū)信息。

跑一下吵取,輸出是這樣的:

message send to partition 0, offset: 28
message send to partition 1, offset: 26
message send to partition 0, offset: 29
message send to partition 1, offset: 27
message send to partition 1, offset: 28
message send to partition 0, offset: 30
message send to partition 0, offset: 31
message send to partition 1, offset: 29
message send to partition 1, offset: 30
message send to partition 1, offset: 31
message send to partition 0, offset: 32
message send to partition 0, offset: 33
message send to partition 0, offset: 34
message send to partition 1, offset: 32

乍一看似乎offset亂掉了皮官,但其實(shí)這是因?yàn)橄⒎植荚诹藘蓚€(gè)分區(qū)上捺氢,每個(gè)分區(qū)上的offset其實(shí)是正確遞增的摄乒。

使用Kafka的Consumer API來(lái)完成消息的消費(fèi)

  1. 改造一下KafkaUtil類馍佑,加入Consumer client的構(gòu)造梨水。
public class KafkaUtil {
    private static KafkaProducer<String, String> kp;
    private static KafkaConsumer<String, String> kc;

    public static KafkaProducer<String, String> getProducer() {
        if (kp == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
            props.put("acks", "1");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kp = new KafkaProducer<String, String>(props);
        }
        return kp;
    }
    
    public static KafkaConsumer<String, String> getConsumer() {
        if(kc == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
            props.put("group.id", "1");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            kc = new KafkaConsumer<String, String>(props);
        }
        return kc;
    }
}

同樣穷劈,我們介紹一下Consumer常用配置

  • bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含義一樣歇终,不再贅述
  • fetch.min.bytes:每次最小拉取的消息大衅滥(byte)奕短。Consumer會(huì)等待消息積累到一定尺寸后進(jìn)行批量拉取。默認(rèn)為1谬返,代表有一條就拉一條
  • max.partition.fetch.bytes:每次從單個(gè)分區(qū)中拉取的消息最大尺寸(byte)遣铝,默認(rèn)為1M
  • group.id:Consumer的group id酿炸,同一個(gè)group下的多個(gè)Consumer不會(huì)拉取到重復(fù)的消息涨冀,不同group下的Consumer則會(huì)保證拉取到每一條消息鹿鳖。注意栓辜,同一個(gè)group下的consumer數(shù)量不能超過(guò)分區(qū)數(shù)藕甩。
  • enable.auto.commit:是否自動(dòng)提交已拉取消息的offset狭莱。提交offset即視為該消息已經(jīng)成功被消費(fèi),該組下的Consumer無(wú)法再拉取到該消息(除非手動(dòng)修改offset)默怨。默認(rèn)為true
  • auto.commit.interval.ms:自動(dòng)提交offset的間隔毫秒數(shù)匙睹,默認(rèn)5000痕檬。

全部的Consumer配置見(jiàn)官方文檔:http://kafka.apache.org/documentation.html#newconsumerconfigs

  1. 編寫Consumer端:
public class KafkaTest {
    public static void main(String[] args) throws Exception{
        KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
        consumer.subscribe(Arrays.asList("test"));
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
        }
    }
}

運(yùn)行輸出:

fetched from partition 0, offset: 28, message: this is message0
fetched from partition 0, offset: 29, message: this is message2
fetched from partition 0, offset: 30, message: this is message5
fetched from partition 0, offset: 31, message: this is message6
fetched from partition 0, offset: 32, message: this is message10
fetched from partition 0, offset: 33, message: this is message11
fetched from partition 0, offset: 34, message: this is message12
fetched from partition 1, offset: 26, message: this is message1
fetched from partition 1, offset: 27, message: this is message3
fetched from partition 1, offset: 28, message: this is message4
fetched from partition 1, offset: 29, message: this is message7
fetched from partition 1, offset: 30, message: this is message8
fetched from partition 1, offset: 31, message: this is message9
fetched from partition 1, offset: 32, message: this is message13

說(shuō)明:

KafkaConsumer的poll方法即是從Broker拉取消息丘跌,在poll之前首先要用subscribe方法訂閱一個(gè)Topic闭树。
poll方法的入?yún)⑹抢〕瑫r(shí)毫秒數(shù)报辱,如果沒(méi)有新的消息可供拉取仰猖,consumer會(huì)等待指定的毫秒數(shù)饥侵,到達(dá)超時(shí)時(shí)間后會(huì)直接返回一個(gè)空的結(jié)果集躏升。
如果Topic有多個(gè)partition膨疏,KafkaConsumer會(huì)在多個(gè)partition間以輪詢方式實(shí)現(xiàn)負(fù)載均衡佃却。如果啟動(dòng)了多個(gè) Consumer線程窘俺,Kafka也能夠通過(guò)zookeeper實(shí)現(xiàn)多個(gè)Consumer間的調(diào)度瘤泪,保證同一組下的Consumer不會(huì)重復(fù)消費(fèi)消息对途。注 意实檀,Consumer數(shù)量不能超過(guò)partition數(shù),超出部分的Consumer無(wú)法拉取到任何數(shù)據(jù)儒喊。
可以看出怀愧,拉取到的消息并不是完全順序化的余赢,kafka只能保證一個(gè)partition內(nèi)的消息先進(jìn)先出妻柒,所以在跨partition的情況下举塔,消息的順序是沒(méi)有保證的央渣。
本例中采用的是自動(dòng)提交offset芽丹,Kafka client會(huì)啟動(dòng)一個(gè)線程定期將offset提交至broker拔第。假設(shè)在自動(dòng)提交的間隔內(nèi)發(fā)生故障(比如整個(gè)JVM進(jìn)程死掉)蚊俺,那么有一部分消息是會(huì)被 重復(fù)消費(fèi)的。要避免這一問(wèn)題肩钠,可使用手動(dòng)提交offset的方式价匠。構(gòu)造consumer時(shí)將enable.auto.commit設(shè)為false踩窖,并在代碼中用consumer.commitSync()來(lái)手動(dòng)提交洋腮。

如果不想讓kafka控制consumer拉取數(shù)據(jù)時(shí)在partition間的負(fù)載均衡啥供,也可以手工控制:

    public static void main(String[] args) throws Exception{
        KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
        String topic = "test";
        TopicPartition partition0 = new TopicPartition(topic, 0);
        TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0, partition1));
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
            consumer.commitSync();
        }
    }

使用consumer.assign()方法為consumer線程指定1個(gè)或多個(gè)partition伙狐。

此處的坑:

在測(cè)試中我發(fā)現(xiàn)贷屎,如果用手工指定partition的方法拉取消息唉侄,不知為何kafka的自動(dòng)提交offset機(jī)制會(huì)失效属划,必須使用手動(dòng)方式才能正確提交已消費(fèi)的消息offset同眯。

題外話:

在真正的應(yīng)用環(huán)境中嗽测,Consumer端將消息拉取下來(lái)后要做的肯定不止是輸出出來(lái)這么簡(jiǎn)單唠粥,在消費(fèi)消息時(shí)很有可能需要花掉更多的時(shí)間晤愧。1個(gè) Consumer線程消費(fèi)消息的速度很有可能是趕不上Producer產(chǎn)生消息的速度官份,所以我們不得不考慮Consumer端采用多線程模型來(lái)消費(fèi)消息舅巷。
然而KafkaConsumer并不是線程安全的钠右,多個(gè)線程操作同一個(gè)KafkaConsumer實(shí)例會(huì)出現(xiàn)各種問(wèn)題飒房,Kafka官方對(duì)于Consumer端的多線程處理給出的指導(dǎo)建議如下:

1. 每個(gè)線程都持有一個(gè)KafkaConsumer對(duì)象

好處:

  • 實(shí)現(xiàn)簡(jiǎn)單
  • 不需要線程間的協(xié)作狠毯,效率最高
  • 最容易實(shí)現(xiàn)每個(gè)Partition內(nèi)消息的順序處理

弊端:

  • 每個(gè)KafkaConsumer都要與集群保持一個(gè)TCP連接
  • 線程數(shù)不能超過(guò)Partition數(shù)
  • 每一batch拉取的數(shù)據(jù)量會(huì)變小嚼松,對(duì)吞吐量有一定影響

2. 解耦
1個(gè)Consumer線程負(fù)責(zé)拉取消息惜颇,數(shù)個(gè)Worker線程負(fù)責(zé)消費(fèi)消息

好處:

  • 可自由控制Worker線程的數(shù)量凌摄,不受Partition數(shù)量限制

弊端:

  • 消息消費(fèi)的順序無(wú)法保證
  • 難以控制手動(dòng)提交offset的時(shí)機(jī)

個(gè)人認(rèn)為第二種方式更加可取锨亏,consumer數(shù)不能超過(guò)partition數(shù)這個(gè)限制是很要命的器予,不可能為了提高Consumer消費(fèi)消息的效率而把Topic分成更多的partition乾翔,partition越多反浓,集群的高可用性就越低雷则。

Kafka集群高可用性測(cè)試

  1. 查看當(dāng)前Topic的狀態(tài):
/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

輸出:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
   Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

可以看到,partition0的leader是broker1月劈,parition1的leader是broker0

  1. 啟動(dòng)Producer向Kafka集群發(fā)送消息

輸出:

message send to partition 0, offset: 35
message send to partition 1, offset: 33
message send to partition 0, offset: 36
message send to partition 1, offset: 34
message send to partition 1, offset: 35
message send to partition 0, offset: 37
message send to partition 0, offset: 38
message send to partition 1, offset: 36
message send to partition 1, offset: 37
  1. 登錄SSH將broker0度迂,也就是partition 1的leader kill掉

再次查看Topic狀態(tài):

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
  Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
  Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1

可以看到,當(dāng)前parition0和parition1的leader都是broker1了
此時(shí)再去看Producer的輸出:

    [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with /10.0.0.100 disconnected
    java.net.ConnectException: Connection refused: no further information
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
        at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
        at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:745)
    [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 7 to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test, partition = 0, leader = 1, replicas = [1,], isr = [1,]])

能看到Producer端的DEBUG日志顯示與broker0的鏈接斷開了猜揪,此時(shí)Kafka立刻開始更新集群metadata惭墓,更新后的metadata表示broker1現(xiàn)在是兩個(gè)partition的leader,Producer進(jìn)程很快就恢復(fù)繼續(xù)運(yùn)行而姐,沒(méi)有漏發(fā)任何消息,能夠看出Kafka集群的故障切換機(jī)制還是很厲害的

  1. 我們?cè)侔裝roker0啟動(dòng)起來(lái)
bin/kafka-server-start.sh -daemon config/server.properties 

然后再次檢查Topic狀態(tài):

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
   Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0

我們看到,broker0啟動(dòng)起來(lái)了吭狡,并且已經(jīng)是in-sync狀態(tài)(注意Isr從1變成了1,0)尖殃,但此時(shí)兩個(gè)partition的leader還都是 broker1,也就是說(shuō)當(dāng)前broker1會(huì)承載所有的發(fā)送和拉取請(qǐng)求划煮。這顯然是不行的送丰,我們要讓集群恢復(fù)到負(fù)載均衡的狀態(tài)。

這時(shí)候弛秋,需要使用Kafka的選舉工具觸發(fā)一次選舉:

bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181

選舉完成后器躏,再次查看Topic狀態(tài):

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
   Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0

可以看到,集群重新回到了broker0掛掉之前的狀態(tài)

但此時(shí)蟹略,Producer端產(chǎn)生了異常:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

原因是Producer端在嘗試向broker1的parition0發(fā)送消息時(shí)登失,partition0的leader已經(jīng)切換成了broker0,所以消息發(fā)送失敗挖炬。

此時(shí)用Consumer去消費(fèi)消息揽浙,會(huì)發(fā)現(xiàn)消息的編號(hào)不連續(xù)了,確實(shí)漏發(fā)了一條消息意敛。這是因?yàn)槲覀冊(cè)跇?gòu)造Producer時(shí)設(shè)定了retries=0馅巷,所以在發(fā)送失敗時(shí)Producer端不會(huì)嘗試重發(fā)。

將retries改為3后再次嘗試草姻,會(huì)發(fā)現(xiàn)leader切換時(shí)再次發(fā)生了同樣的問(wèn)題钓猬,但Producer的重發(fā)機(jī)制起了作用,消息重發(fā)成功撩独,啟動(dòng)Consumer端檢查也證實(shí)了所有消息都發(fā)送成功了敞曹。

每次集群?jiǎn)吸c(diǎn)發(fā)生故障恢復(fù)后,都需要進(jìn)行重新選舉才能徹底恢復(fù)集群的leader分配综膀,如果嫌每次這樣做很麻煩澳迫,可以在broker的配置文件(即 server.properties)中配置auto.leader.rebalance.enable=true,這樣broker在啟動(dòng)后就會(huì)自動(dòng)進(jìn)行重新選舉

至此僧须,我們通過(guò)測(cè)試證實(shí)了集群出現(xiàn)單點(diǎn)故障和恢復(fù)的過(guò)程中纲刀,Producer端能夠保持正確運(yùn)轉(zhuǎn)项炼。接下來(lái)我們看一下Consumer端的表現(xiàn):

  1. 同時(shí)啟動(dòng)Producer進(jìn)程和Consumer進(jìn)程

此時(shí)Producer一邊在生產(chǎn)消息担平,Consumer一邊在消費(fèi)消息

  1. 把broker0干掉,觀察Consumer端的輸出

能看到锭部,在broker0掛掉后暂论,consumer也端產(chǎn)生了一系列INFO和WARN日志,但同Producer端一樣拌禾,若干秒后自動(dòng)恢復(fù)取胎,消息仍然是連續(xù)的,并未出現(xiàn)斷點(diǎn)。

  1. 再次把broker0啟動(dòng)闻蛀,并觸發(fā)重新選舉匪傍,然后觀察輸出:
    fetched from partition 0, offset: 418, message: this is message48
    fetched from partition 0, offset: 419, message: this is message49
    [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
    [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead.
    [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group.
    fetched from partition 1, offset: 392, message: this is message50
    fetched from partition 0, offset: 420, message: this is message51

能看到,重選舉后Consumer端也輸出了一些日志觉痛,意思是在提交offset時(shí)發(fā)現(xiàn)當(dāng)前的調(diào)度器已經(jīng)失效了役衡,但很快就重新獲取了新的有效調(diào)度器,恢復(fù) 了offset的自動(dòng)提交薪棒,驗(yàn)證已提交offset的值也證明了offset提交并未因leader切換而發(fā)生錯(cuò)誤手蝎。

如上,我們也通過(guò)測(cè)試證實(shí)了Kafka集群出現(xiàn)單點(diǎn)故障時(shí)俐芯,Consumer端的功能正確性棵介。

至此,Kafka+Zookeeper集群的安裝配置吧史、高可用性驗(yàn)證邮辽、Java Client的使用介紹就結(jié)束了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市贸营,隨后出現(xiàn)的幾起案子逆巍,更是在濱河造成了極大的恐慌,老刑警劉巖莽使,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锐极,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡芳肌,警方通過(guò)查閱死者的電腦和手機(jī)灵再,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)亿笤,“玉大人翎迁,你說(shuō)我怎么就攤上這事【谎Γ” “怎么了汪榔?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)肃拜。 經(jīng)常有香客問(wèn)我痴腌,道長(zhǎng),這世上最難降的妖魔是什么燃领? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任士聪,我火速辦了婚禮,結(jié)果婚禮上猛蔽,老公的妹妹穿的比我還像新娘剥悟。我一直安慰自己灵寺,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布区岗。 她就那樣靜靜地躺著略板,像睡著了一般。 火紅的嫁衣襯著肌膚如雪慈缔。 梳的紋絲不亂的頭發(fā)上蚯根,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音胀糜,去河邊找鬼颅拦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛教藻,可吹牛的內(nèi)容都是我干的距帅。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼括堤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼碌秸!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起悄窃,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤讥电,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后轧抗,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體恩敌,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年横媚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了纠炮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡灯蝴,死狀恐怖恢口,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情穷躁,我是刑警寧澤耕肩,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站问潭,受9級(jí)特大地震影響猿诸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜睦授,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一两芳、第九天 我趴在偏房一處隱蔽的房頂上張望摔寨。 院中可真熱鬧去枷,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至逗余,卻和暖如春特咆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背录粱。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工腻格, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人啥繁。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓菜职,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親旗闽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子酬核,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 一、入門1适室、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,348評(píng)論 0 9
  • 一嫡意、Kafka簡(jiǎn)介 Kafka (科技術(shù)語(yǔ))。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)捣辆,它可以處理消費(fèi)者規(guī)...
    邊學(xué)邊記閱讀 1,732評(píng)論 0 14
  • Kafka史上最詳細(xì)原理總結(jié)分為上下兩部分蔬螟,承上啟下 Kafka史上最詳細(xì)原理總結(jié)上 Kafka史上最詳細(xì)原理總結(jié)...
    小波同學(xué)閱讀 22,162評(píng)論 1 115
  • 2017,6月7號(hào)星期三多云轉(zhuǎn)晴,和往常一樣又是一天汽畴,由于晚上回來(lái)的晚促煮,剛開始寫親子日記。呵呵整袁,女兒寫作業(yè)菠齿,我在一...
    A書香門第閱讀 282評(píng)論 0 0
  • 昨天看完了芳華這部影片,我的心里依然隱隱作痛坐昙。影片很感人也很現(xiàn)實(shí)绳匀。 男主角劉峰,可以說(shuō)是一個(gè)善良和單純的人炸客,正因?yàn)?..
    我們一起向前走閱讀 136評(píng)論 5 3