原創(chuàng)文章,歡迎轉(zhuǎn)載。轉(zhuǎn)載請注明:轉(zhuǎn)載自IT人故事會,謝謝晕拆!
原文鏈接地址:『互聯(lián)網(wǎng)架構(gòu)』kafka集群原理(117)
之前主要是理論說了kafka的原理,kafka相關(guān)的三個比較重要的配置文件server材蹬,consumer实幕,Producer的詳細(xì)配置,以及kafka消息的存儲形式堤器,主要是保存在zookeeper上昆庇。應(yīng)該按照之前的文檔單實例的kafka都搭建成功了。這次主要說說集群的搭建闸溃。
源碼:https://github.com/limingios/netFuture/tree/master/源碼/『互聯(lián)網(wǎng)架構(gòu)』kafka集群搭建和使用(117)
(一)kafka集群的搭建
- 查看主題
cd /opt/kafka_2.12-2.2.1
bin/kafka-topics.sh --list --zookeeper localhost:2181
#__consumer_offsets 記錄偏移量的
# test 主題的名稱
- 搭建集群
單個節(jié)點掛了就掛了整吆,為了讓項目高可用必須搭建多節(jié)點。在生產(chǎn)環(huán)境肯定不能使用單節(jié)點肯定是使用多節(jié)點辉川。到目前為止表蝙,我們都是在一個單節(jié)點上運(yùn)行broker,這并沒有什么意思乓旗。對于kafka來說府蛇,一個單獨的broker意味著kafka集群中只有一個接點。要想增加kafka集群中的節(jié)點數(shù)量寸齐,只需要多啟動幾個broker實例即可欲诺。為了有更好的理解,現(xiàn)在我們在一臺機(jī)器上同時啟動三個broker實例渺鹦,搭建偽分布扰法。其實搭建多臺也是一樣的。
首先毅厚,我們需要建立好其他2個broker的配置文件
cd /opt/kafka_2.12-2.2.1
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
配置文件的內(nèi)容分別如下:
config/server-1.properties
vi config/server-1.properties
broker.id=1
#注釋放開
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
vi config/server-2.properties
broker.id=2
#注釋放開
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id屬性在kafka集群中必須要是唯一的塞颁。我們需要重新指定port和log目錄,因為我們是在同一臺機(jī)器上運(yùn)行多個實例。如果不進(jìn)行修改的話祠锣,
目前我們已經(jīng)有一個zookeeper實例和一個broker實例在運(yùn)行了酷窥,現(xiàn)在我們只需要在啟動2個broker實例。
cd /opt/kafka_2.12-2.2.1
bin/kafka-server-start.sh config/server-1.properties &
cd /opt/kafka_2.12-2.2.1
bin/kafka-server-start.sh config/server-2.properties &
- 創(chuàng)建單分區(qū)主題:備份因子設(shè)置為3伴网,因為有3個節(jié)點的集群蓬推,不允許設(shè)置大概3的。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看集群的主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
現(xiàn)在已經(jīng)有了集群澡腾,并且創(chuàng)建了一個3個備份因子的topic沸伏,但是到底是哪一個broker在為這個topic提供服務(wù)呢(因為我們只有一個分區(qū),所以肯定同時只有一個broker在處理這個topic)动分?
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic: 主題的名稱
PartitionCount: 因為創(chuàng)建的時候就創(chuàng)建了一個分區(qū)毅糟,目前顯示1
ReplicationFactor: 備份因子是3個
Partition:分區(qū)在這個主題的編號
Leader:編號為1的broker.id,這個主題對外提供讀寫的節(jié)點的是編號為1的節(jié)點澜公。
Replicas:副本編號1姆另,2,0
Isr:已經(jīng)同步的副本1坟乾,2迹辐,0
- 刪除一個Leader節(jié)點查看描述
#通過配置文件找到對應(yīng)的進(jìn)程id
ps -ef | grep server-1.pro
kill -9 3221
#剩余2個kafka
jps
#刪除了broker.id=1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
刪除了broker.id=1的節(jié)點,剩余2個節(jié)點0和2糊渊,進(jìn)行選舉leader右核。目前的leader變成了2,副本還是3個渺绒,活著已同步的節(jié)點沒有1了贺喝。
- 創(chuàng)建多分區(qū)主題:備份因子設(shè)置為2,重新啟動broker.id=1宗兼,有3個節(jié)點的集群躏鱼,分區(qū)設(shè)置2。
jps
bin/kafka-server-start.sh config/server-1.properties &
jps
# 創(chuàng)建新主題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic my-test2
# 查看主題列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主題的情況my-test2殷绍,2個分區(qū)染苛,2個備份因子。2個分區(qū)每個分區(qū)有個leader主到。一定要明白leader是分區(qū)的leader茶行,不是節(jié)點的leader。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test2
- 單播消費
一條消息只能被某一個消費者消費的模式登钥,類似queue模式畔师,只需讓所有消費者在同一個消費組里即可
分別在兩個客戶端執(zhí)行如下消費命令,然后往主題里發(fā)送消息牧牢,結(jié)果只有一個客戶端能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
- 多播消費
一條消息能被多個消費者消費的模式看锉,類似publish-subscribe模式費姿锭,針對Kafka同一條消息只能被同一個消費組下的某一個消費者消費的特性,要實現(xiàn)多播只要保證這些消費者屬于不同的消費組即可伯铣。我們再增加一個消費者呻此,該消費者屬于testGroup-2消費組,結(jié)果兩個客戶端都能收到消息腔寡。如果2個消費者都屬于一個消費組焚鲜,只能有一個收到。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic test
(二)kafka-java客戶端調(diào)用
- 官方文檔
- host文件中加入kafka的host
- 消費者類
package com.idig8.kafka.kafkaDemo;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class MsgConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.101:9092");
// 消費分組名
props.put("group.id", "testGroup");
// 是否自動提交offset
//props.put("enable.auto.commit", "true");
// 自動提交offset的間隔時間
//props.put("auto.commit.interval.ms", "1000");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 消費主題
consumer.subscribe(Arrays.asList("test"));
// 消費指定分區(qū)
//consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
while (true) {
/*
* poll() API 主要是判斷consumer是否還活著蹬蚁,只要我們持續(xù)調(diào)用poll()恃泪,消費者就會存活在自己所在的group中郑兴,
* 并且持續(xù)的消費指定partition的消息犀斋。底層是這么做的:消費者向server持續(xù)發(fā)送心跳,如果一個時間段(session.
* timeout.ms)consumer掛掉或是不能發(fā)送心跳情连,這個消費者會被認(rèn)為是掛掉了叽粹,
* 這個Partition也會被重新分配給其他consumer
*/
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 提交offset
consumer.commitSync();
}
}
}
}
- 生產(chǎn)者,分為同步和異步兩種方式
package com.idig8.kafka.kafkaDemo;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MsgProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.101:9092,192.168.80.101:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 5; i++) {
//同步方式發(fā)送消息
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test", 0, Integer.toString(i), Integer.toString(i));
/*Future<RecordMetadata> result = producer.send(producerRecord);
//等待消息發(fā)送成功的同步阻塞方法
RecordMetadata metadata = result.get();
System.out.println("同步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());*/
//異步方式發(fā)送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發(fā)送消息失斎匆ā:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("異步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
}
producer.close();
}
}
- pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tuling.kafka</groupId>
<artifactId>kafkaDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafkaDemo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<!-- 由于新版的客戶端沒有引入日志框架實現(xiàn)的依賴虫几,所以我們要自己引入 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
(三)kafka的選舉一個圖足夠了
PS:kafka消息不會丟失,只會定期刪除挽拔。java源碼不太負(fù)責(zé)辆脸,直接看官網(wǎng)的api就可以了。消費的方式是通過偏移量來進(jìn)行的螃诅。