『互聯(lián)網(wǎng)架構(gòu)』kafka集群搭建和使用(117)

原創(chuàng)文章,歡迎轉(zhuǎn)載。轉(zhuǎn)載請注明:轉(zhuǎn)載自IT人故事會,謝謝晕拆!
原文鏈接地址:『互聯(lián)網(wǎng)架構(gòu)』kafka集群原理(117)

之前主要是理論說了kafka的原理,kafka相關(guān)的三個比較重要的配置文件server材蹬,consumer实幕,Producer的詳細(xì)配置,以及kafka消息的存儲形式堤器,主要是保存在zookeeper上昆庇。應(yīng)該按照之前的文檔單實例的kafka都搭建成功了。這次主要說說集群的搭建闸溃。
源碼:https://github.com/limingios/netFuture/tree/master/源碼/『互聯(lián)網(wǎng)架構(gòu)』kafka集群搭建和使用(117)

(一)kafka集群的搭建

  • 查看主題
cd /opt/kafka_2.12-2.2.1
bin/kafka-topics.sh --list --zookeeper localhost:2181
#__consumer_offsets 記錄偏移量的
# test 主題的名稱
  • 搭建集群

單個節(jié)點掛了就掛了整吆,為了讓項目高可用必須搭建多節(jié)點。在生產(chǎn)環(huán)境肯定不能使用單節(jié)點肯定是使用多節(jié)點辉川。到目前為止表蝙,我們都是在一個單節(jié)點上運(yùn)行broker,這并沒有什么意思乓旗。對于kafka來說府蛇,一個單獨的broker意味著kafka集群中只有一個接點。要想增加kafka集群中的節(jié)點數(shù)量寸齐,只需要多啟動幾個broker實例即可欲诺。為了有更好的理解,現(xiàn)在我們在一臺機(jī)器上同時啟動三個broker實例渺鹦,搭建偽分布扰法。其實搭建多臺也是一樣的。

首先毅厚,我們需要建立好其他2個broker的配置文件

cd /opt/kafka_2.12-2.2.1
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
配置文件的內(nèi)容分別如下:

config/server-1.properties

vi config/server-1.properties
broker.id=1
#注釋放開
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:

vi config/server-2.properties
broker.id=2
#注釋放開
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

broker.id屬性在kafka集群中必須要是唯一的塞颁。我們需要重新指定port和log目錄,因為我們是在同一臺機(jī)器上運(yùn)行多個實例。如果不進(jìn)行修改的話祠锣,

目前我們已經(jīng)有一個zookeeper實例和一個broker實例在運(yùn)行了酷窥,現(xiàn)在我們只需要在啟動2個broker實例。

cd /opt/kafka_2.12-2.2.1
bin/kafka-server-start.sh config/server-1.properties &
cd /opt/kafka_2.12-2.2.1
bin/kafka-server-start.sh config/server-2.properties &
  • 創(chuàng)建單分區(qū)主題:備份因子設(shè)置為3伴网,因為有3個節(jié)點的集群蓬推,不允許設(shè)置大概3的。
 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看集群的主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

現(xiàn)在已經(jīng)有了集群澡腾,并且創(chuàng)建了一個3個備份因子的topic沸伏,但是到底是哪一個broker在為這個topic提供服務(wù)呢(因為我們只有一個分區(qū),所以肯定同時只有一個broker在處理這個topic)动分?

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic: 主題的名稱
PartitionCount: 因為創(chuàng)建的時候就創(chuàng)建了一個分區(qū)毅糟,目前顯示1
ReplicationFactor: 備份因子是3個
Partition:分區(qū)在這個主題的編號
Leader:編號為1的broker.id,這個主題對外提供讀寫的節(jié)點的是編號為1的節(jié)點澜公。
Replicas:副本編號1姆另,2,0
Isr:已經(jīng)同步的副本1坟乾,2迹辐,0

  • 刪除一個Leader節(jié)點查看描述
#通過配置文件找到對應(yīng)的進(jìn)程id
 ps -ef | grep server-1.pro 

kill -9 3221
#剩余2個kafka
jps
#刪除了broker.id=1
 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  

刪除了broker.id=1的節(jié)點,剩余2個節(jié)點0和2糊渊,進(jìn)行選舉leader右核。目前的leader變成了2,副本還是3個渺绒,活著已同步的節(jié)點沒有1了贺喝。

  • 創(chuàng)建多分區(qū)主題:備份因子設(shè)置為2,重新啟動broker.id=1宗兼,有3個節(jié)點的集群躏鱼,分區(qū)設(shè)置2。
jps
bin/kafka-server-start.sh config/server-1.properties & 
jps
# 創(chuàng)建新主題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic my-test2
# 查看主題列表
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看主題的情況my-test2殷绍,2個分區(qū)染苛,2個備份因子。2個分區(qū)每個分區(qū)有個leader主到。一定要明白leader是分區(qū)的leader茶行,不是節(jié)點的leader。

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test2
  • 單播消費

一條消息只能被某一個消費者消費的模式登钥,類似queue模式畔师,只需讓所有消費者在同一個消費組里即可

分別在兩個客戶端執(zhí)行如下消費命令,然后往主題里發(fā)送消息牧牢,結(jié)果只有一個客戶端能收到消息

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
  • 多播消費

一條消息能被多個消費者消費的模式看锉,類似publish-subscribe模式費姿锭,針對Kafka同一條消息只能被同一個消費組下的某一個消費者消費的特性,要實現(xiàn)多播只要保證這些消費者屬于不同的消費組即可伯铣。我們再增加一個消費者呻此,該消費者屬于testGroup-2消費組,結(jié)果兩個客戶端都能收到消息腔寡。如果2個消費者都屬于一個消費組焚鲜,只能有一個收到。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic test

(二)kafka-java客戶端調(diào)用

  • 官方文檔

http://kafka.apache.org/documentation/#api

  • host文件中加入kafka的host
  • 消費者類
package com.idig8.kafka.kafkaDemo;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class MsgConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.101:9092");
        // 消費分組名
        props.put("group.id", "testGroup");
        // 是否自動提交offset
        //props.put("enable.auto.commit", "true");
        // 自動提交offset的間隔時間
        //props.put("auto.commit.interval.ms", "1000");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 消費主題
        consumer.subscribe(Arrays.asList("test"));
        // 消費指定分區(qū)
        //consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
        while (true) {
            /*
             * poll() API 主要是判斷consumer是否還活著蹬蚁,只要我們持續(xù)調(diào)用poll()恃泪,消費者就會存活在自己所在的group中郑兴,
             * 并且持續(xù)的消費指定partition的消息犀斋。底層是這么做的:消費者向server持續(xù)發(fā)送心跳,如果一個時間段(session.
             * timeout.ms)consumer掛掉或是不能發(fā)送心跳情连,這個消費者會被認(rèn)為是掛掉了叽粹,
             * 這個Partition也會被重新分配給其他consumer
             */
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            
            if (records.count() > 0) { 
                // 提交offset 
                consumer.commitSync(); 
            }
             
        }
    }
}

  • 生產(chǎn)者,分為同步和異步兩種方式
package com.idig8.kafka.kafkaDemo;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class MsgProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.101:9092,192.168.80.101:9093");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 5; i++) {
            //同步方式發(fā)送消息
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test", 0, Integer.toString(i), Integer.toString(i));
            /*Future<RecordMetadata> result = producer.send(producerRecord);
            //等待消息發(fā)送成功的同步阻塞方法
            RecordMetadata metadata = result.get();
            System.out.println("同步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());*/

            //異步方式發(fā)送消息
            producer.send(producerRecord, new Callback() {

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("發(fā)送消息失斎匆ā:" + exception.getStackTrace());
                    }
                    if (metadata != null) {
                        System.out.println("異步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                }
            });
        }

        producer.close();
    }
}

  • pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tuling.kafka</groupId>
    <artifactId>kafkaDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafkaDemo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- 由于新版的客戶端沒有引入日志框架實現(xiàn)的依賴虫几,所以我們要自己引入 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

(三)kafka的選舉一個圖足夠了

PS:kafka消息不會丟失,只會定期刪除挽拔。java源碼不太負(fù)責(zé)辆脸,直接看官網(wǎng)的api就可以了。消費的方式是通過偏移量來進(jìn)行的螃诅。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末啡氢,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子术裸,更是在濱河造成了極大的恐慌倘是,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件袭艺,死亡現(xiàn)場離奇詭異搀崭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)猾编,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門瘤睹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人答倡,你說我怎么就攤上這事轰传。” “怎么了苇羡?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵绸吸,是天一觀的道長鼻弧。 經(jīng)常有香客問我,道長锦茁,這世上最難降的妖魔是什么攘轩? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮码俩,結(jié)果婚禮上度帮,老公的妹妹穿的比我還像新娘。我一直安慰自己稿存,他們只是感情好笨篷,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著瓣履,像睡著了一般率翅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上袖迎,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天冕臭,我揣著相機(jī)與錄音,去河邊找鬼燕锥。 笑死辜贵,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的归形。 我是一名探鬼主播托慨,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼暇榴!你這毒婦竟也來了厚棵?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤跺撼,失蹤者是張志新(化名)和其女友劉穎窟感,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體歉井,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡柿祈,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了哩至。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片躏嚎。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖菩貌,靈堂內(nèi)的尸體忽然破棺而出卢佣,到底是詐尸還是另有隱情,我是刑警寧澤箭阶,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布虚茶,位于F島的核電站戈鲁,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏嘹叫。R本人自食惡果不足惜婆殿,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望罩扇。 院中可真熱鬧婆芦,春花似錦、人聲如沸喂饥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽员帮。三九已至或粮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間集侯,已是汗流浹背被啼。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留棠枉,地道東北人。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓泡挺,卻偏偏與公主長得像辈讶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子娄猫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容