大數(shù)據(jù)02-Kafka最新版的基本開發(fā)

1比然、單節(jié)點(diǎn) 單 broker

1.1丈氓、ZK 的安裝

1、首先下載 ZK强法,解壓到/app万俗,配置下環(huán)境變量

2、進(jìn)入$ZOOKEEPER_HOME/conf拟烫,配置一份 zk.conf(從zoo_sample.cfg拷貝)

3该编、修改 data 的路徑為dataDir=/app/zookeeper-3.4.12/data,諸葛目錄要手動(dòng)構(gòu)建

4、啟動(dòng)服務(wù) zkServer.sh start

1.2硕淑、KafKa安裝

1课竣、下載 kafka_2.11-2.0.0,注意對(duì)應(yīng)的 Scala置媳,

2于樟、配置環(huán)境變量

1.3、配置server.properties

broker.id=0 集群的時(shí)候用拇囊,每個(gè) cluster 該 id 不同
listeners=PLAINTEXT://localhost:9092 默認(rèn)端口9092
host.name=localhost 當(dāng)前機(jī)器
log.dirs=/app/kafka_2.11-2.0.0/kafaka-logs kafaka 日志
zookeeper.connect=localhost:2181 ZK 地址

1.4迂曲、啟動(dòng)

kafka-server-start.sh $KAFKA_HOME/config/server.properties
jps查看進(jìn)程

1.5、創(chuàng)建 Topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello_topic

其中:
--zookeeper 指定 zk 地址
--replication-factor 指定副本數(shù)
--partitions 指定分區(qū)數(shù)
--topic 指定名稱

1.6寥袭、查看所有 Topic

kafka-topics.sh --list --zookeeper localhost:2181

查看狀態(tài)

kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello_topic
image

1.7路捧、產(chǎn)生消息

kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic

生成消息是送入 Topic 里面,這里需要指定--broker-list传黄,進(jìn)入阻塞模式

1.8杰扫、消費(fèi)消息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello_topic --from-beginning

--from-beginning 表示從第一條消息開始
然后進(jìn)入阻塞狀態(tài)

1.9、調(diào)試

在生產(chǎn)的狀態(tài)下膘掰,發(fā)送消息章姓,然后在消費(fèi)的狀態(tài)下,可以看到消息正常消費(fèi)

2识埋、單節(jié)點(diǎn)多 broker

2.1 啟動(dòng) ZK

同上

2.2 配置多份 server.properties

cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties

修改其中的

#config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://localhost:9093
    log.dirs=/tmp/kafka-logs-1
 
#config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://localhost:9094
    log.dirs=/tmp/kafka-logs-2

2.3 后臺(tái)運(yùn)行

kafka-server-start.sh $KAFKA_HOME/config/server.properties &
kafka-server-start.sh $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh $KAFKA_HOME/config/server-2.properties &
jps查看

2.4 創(chuàng)建 Topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

2.5 查看這個(gè) Topic

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
查看多 broker 的 Topic

leader 表示標(biāo)號(hào)是2的 broker 是主
replicas 表示副本是3個(gè)
Isr 表示活著的有broker

2.6 發(fā)送消息和消費(fèi)消息

kafka-console-producer.sh --broker-list PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 --topic my-replicated-topic
kafka-console-consumer.sh --bootstrap-server PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 --from-beginning --topic my-replicated-topic

2.7 分別停掉其中的 broker凡伊,進(jìn)行測(cè)試

3、使用 API

引入 pom

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.0.0</version>
    </dependency>

生產(chǎn)者


import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;

public class MyProducer {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.31.122:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //生產(chǎn)者發(fā)送消息 
        String topic = "my-replicated-topic";
        Producer<String, String> procuder = new KafkaProducer<String,String>(props);
        for (int i = 1; i <= 10; i++) {
            String value = "value_" + i;
            ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
            procuder.send(msg);
        }
        //列出topic的相關(guān)信息
        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
        partitions = procuder.partitionsFor(topic);
        for(PartitionInfo p:partitions)
        {
            System.out.println(p);
        }

        System.out.println("send message over.");
        procuder.close(100,TimeUnit.MILLISECONDS);
    }
}

消費(fèi)者


import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.31.122:9092");       
        props.put("group.id", "test");//消費(fèi)者的組id
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱主題列表topic
        consumer.subscribe(Arrays.asList("my-replicated-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末窒舟,一起剝皮案震驚了整個(gè)濱河市系忙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌辜纲,老刑警劉巖笨觅,帶你破解...
    沈念sama閱讀 218,525評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拦耐,死亡現(xiàn)場(chǎng)離奇詭異耕腾,居然都是意外死亡见剩,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門扫俺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)苍苞,“玉大人,你說我怎么就攤上這事狼纬「牵” “怎么了?”我有些...
    開封第一講書人閱讀 164,862評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵疗琉,是天一觀的道長(zhǎng)冈欢。 經(jīng)常有香客問我审磁,道長(zhǎng)潮饱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評(píng)論 1 294
  • 正文 為了忘掉前任攒钳,我火速辦了婚禮柠贤,結(jié)果婚禮上香浩,老公的妹妹穿的比我還像新娘。我一直安慰自己臼勉,他們只是感情好邻吭,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,743評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著宴霸,像睡著了一般囱晴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瓢谢,一...
    開封第一講書人閱讀 51,590評(píng)論 1 305
  • 那天畸写,我揣著相機(jī)與錄音,去河邊找鬼恩闻。 笑死艺糜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的幢尚。 我是一名探鬼主播破停,決...
    沈念sama閱讀 40,330評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼尉剩!你這毒婦竟也來(lái)了真慢?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤理茎,失蹤者是張志新(化名)和其女友劉穎黑界,沒想到半個(gè)月后管嬉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡朗鸠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,885評(píng)論 3 336
  • 正文 我和宋清朗相戀三年蚯撩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烛占。...
    茶點(diǎn)故事閱讀 40,001評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡胎挎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出忆家,到底是詐尸還是另有隱情犹菇,我是刑警寧澤,帶...
    沈念sama閱讀 35,723評(píng)論 5 346
  • 正文 年R本政府宣布芽卿,位于F島的核電站揭芍,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏卸例。R本人自食惡果不足惜称杨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,343評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望币厕。 院中可真熱鬧列另,春花似錦、人聲如沸旦装。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)阴绢。三九已至店乐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間呻袭,已是汗流浹背眨八。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留左电,地道東北人廉侧。 一個(gè)月前我還...
    沈念sama閱讀 48,191評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像篓足,于是被迫代替她去往敵國(guó)和親段誊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,955評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一栈拖、入門1连舍、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,348評(píng)論 0 9
  • kafka三大特性 第一、發(fā)布和訂閱 第二涩哟、實(shí)時(shí)的流處理 第三索赏、安全地存儲(chǔ)流數(shù)據(jù)在集群節(jié)點(diǎn)上 kafka的架構(gòu) F...
    機(jī)靈鬼鬼閱讀 1,136評(píng)論 0 0
  • kafka安裝目錄下的bin目錄包含了很多運(yùn)維可操作的shell腳本盼玄,列舉如下: 接下來(lái)詳細(xì)說明每個(gè)腳本的使用方法...
    阿飛的博客閱讀 9,773評(píng)論 5 15
  • 什么是消息系統(tǒng)? 早期兩個(gè)應(yīng)用程序間進(jìn)行消息傳遞需要保證兩個(gè)應(yīng)用程序同時(shí)在線潜腻,并且耦合度很高埃儿。為了解決應(yīng)用程序不在...
    Java小鋪閱讀 1,214評(píng)論 0 2
  • 1介紹 Kafka是一個(gè)分布式的、可分區(qū)的砾赔、可復(fù)制的消息系統(tǒng)蝌箍,提供了一個(gè)生產(chǎn)者青灼、緩沖區(qū)暴心、消費(fèi)者的模型。 Kafka...
    蟲兒飛ZLEI閱讀 631評(píng)論 0 1