【Kafka】Kafka入門手記

1. 前言

本文為 Kafka 入門筆記努潘,主要包括 Kafka 單節(jié)點(diǎn)部署、生產(chǎn)消費(fèi)消息,以及新手踩坑記錄疯坤。

Kafka 作為大數(shù)據(jù)必備組件报慕、消息中間件必學(xué)的 Apache 頂級(jí)開源項(xiàng)目,服務(wù)穩(wěn)定贴膘、高吞吐的流數(shù)據(jù)處理平臺(tái)卖子。具體介紹可查看文末參考文檔略号。

目前 簡(jiǎn)書 暫不支持 markdown 收縮語句塊刑峡,對(duì)于文章展示方面略有問題

1.1. 可查看美觀版本

文檔:Kafka 入門手記.md
鏈接:http://note.youdao.com/noteshare?id=d1c65daf3c137f29a860b5efd5dff944&sub=F4E6A5E5FB3946499E7C4C3E6022E1DC

2. Kafka單節(jié)點(diǎn)部署

2.1. 下載

http://kafka.apache.org

選擇合適版本即可,這里選擇最新版本玄柠。Linux 環(huán)境突梦。

2.2. 解壓

Kafka 安裝包 后綴為 .tgz , 解壓即可羽利。

tar -zxvf kafka_package.tgz

其中宫患,kafka_package.tgz 是 Kafka 安裝包名稱。

2.2.1. zookeeper 安裝

Kafka 依賴于 zookeeper(ZK) 支持这弧,本文直接采用 Kafka 安裝包自帶的 zookeeper娃闲。

也可以單獨(dú)部署 zookeeper,使用方式一樣匾浪。對(duì)于 生產(chǎn)環(huán)境皇帮,建議單獨(dú)搭建 zookeeper。

2.3. 配置

2.3.1. zookeeper

進(jìn)入 Kafka 解壓包根目錄蛋辈,config 文件夾下的即為 Kafka 提供的默認(rèn)配置文件属拾。

此處我們修改下 zookeeper.propertieshost 信息。此處修改 host 配置冷溶,主要是為了避免在使用 Java 客戶端連接解析為 localhost

host.name=your_ip
advertised.host.name=your_ip

其中渐白,your_ip 配置為服務(wù)器外網(wǎng) IP。

2.3.2. Kafka

同樣在 config 配置文件下修改 server.properties 文件即可逞频。

主要配置如下:

# 監(jiān)聽
listeners=PLAINTEXT://服務(wù)器內(nèi)網(wǎng)IP:9092

# 以下兩項(xiàng)類似 zookeeper 配置
advertised.listeners=PLAINTEXT://服務(wù)器外網(wǎng)IP:9092
host.name=外網(wǎng)IP

# zookeeper 連接信息
zookeeper.connect=zookeeper服務(wù)器IP:2181

2.4. 啟動(dòng)

先啟動(dòng) zookeeper纯衍,因?yàn)?Kafka 啟動(dòng)的時(shí)候會(huì)連接注冊(cè) zookeeper。

2.4.1. zookeeper

啟動(dòng) zookeeper苗胀,在 Kafka 根目錄執(zhí)行

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

上述方式襟诸,會(huì)占有 shell 客戶端窗口,如果想后臺(tái)啟動(dòng)柒巫,添加參數(shù) daemon 即可

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

2.4.2. kafka

類似 zookeeper 啟動(dòng)励堡。

./bin/kafka-server-start.sh ./config/server.properties

后臺(tái)啟動(dòng):

./bin/kafka-server-start.sh -daemon ./config/server.properties

此時(shí) Kafka 單節(jié)點(diǎn)部署就已經(jīng)完成了,通過 ps -ef | grep zookeeper, ps -ef | grep kafka 看到對(duì)應(yīng)進(jìn)程堡掏,證明啟動(dòng)成功应结。

2.5. topic

2.5.1. 創(chuàng)建 topic

通過 Kafka 提供的腳本文件,即可創(chuàng)建。在 Kafka 根目錄下執(zhí)行:

./config/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_name

1)指定 zk 為 localhost:2181
2)副本因子為 1鹅龄,即不需要副本
3)partition 數(shù)量為 3
4)topic 名稱為 top_name

2.5.2. 查看 topic

./config/kafka-topics.sh --list --zookeeper localhost:2181
  1. list 命令用于查看
    2)需要指定 zk

2.6. 生產(chǎn)消費(fèi)

此處直接通過 Kafka 提供的簡(jiǎn)單客戶端進(jìn)行生產(chǎn)消費(fèi)數(shù)據(jù)揩慕。

2.6.1. 生產(chǎn)

1、啟動(dòng)簡(jiǎn)單 producer

./config/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name

1) --broker-list 指定 Kafka 的地址及端口
2)--topic 指定具體 topic_name

2扮休、 生產(chǎn)消息

直接在 producer 窗口輸入消息即可迎卤,消息是否發(fā)送成功,直接在 consumer 窗口即可查看玷坠。

2.6.2. 消費(fèi)

1蜗搔、啟動(dòng)簡(jiǎn)單 consumer

./config/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning

1)--bootstrap-server 指定 Kafka 地址及端口
2)-- topic 指定 topic
3)--from-beginning 表示指定從 offset 從頭開始消費(fèi)

2、消費(fèi)數(shù)據(jù)

直接在 producer 窗口發(fā)送消息八堡,然后切換至 consumer 窗口樟凄,查看是否成功消費(fèi)消息。

3. Java Client 生產(chǎn)消費(fèi)

此步基于 SpringBoot 進(jìn)行搭建 demo 項(xiàng)目兄渺。 SpringBoot 版本為 2.x

3.1. 新建 SpringBoot 項(xiàng)目

直接通過 spring.starter 創(chuàng)建即可缝龄。完整項(xiàng)目: lambochen/demo/kafka

3.1.1. 引入依賴

Kafka 依賴:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2. Kafka 配置

3.2.1. producer

1、application.properties 配置

kafka.producer.servers=kafka服務(wù)器IP:服務(wù)器端口號(hào)
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

kafka.topic.default=topic名稱

2挂谍、ProducerFactory, KafkaTemplate 配置:

public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new LinkedHashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG,linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    return props;
}

public ProducerFactory<String, MessageEntity> producerFactory(){
    return new DefaultKafkaProducerFactory<>(
            producerConfigs(),
            new StringSerializer(),
            new JsonSerializer<MessageEntity>());
}

@Bean("kafkaTemplate")
public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

3.2.2. consumer

1叔壤、application.properties 配置

kafka.consumer.zookeeper.connect=ZK服務(wù)器端口:ZK端口
kafka.consumer.servers=Kafka服務(wù)器IP:Kafka端口
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=topic名稱
kafka.consumer.group.id=consumerGroup名稱
kafka.consumer.concurrency=10

2、KafkaListenerContainerFactory 配置

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>>
kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
}

private ConsumerFactory<String, MessageEntity> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
            consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(MessageEntity.class)
    );
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    return propsMap;
}

3.2.3. ProducerCallBack

生產(chǎn)回調(diào)口叙,主要用于生產(chǎn)者發(fā)送消息后的處理炼绘。此 demo 僅作日志記錄。

需要集成 ListenableFutureCallback 庐扫,并指定消息實(shí)體類型饭望。

public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> 

其中, MessageEntity 為消息實(shí)體類型形庭。

3.3. 生產(chǎn)消息

創(chuàng)建 ProducerRecord 消息铅辞,通過 KafkaTemplate 發(fā)送即可。

@Autowired
@Qualifier("kafkaTemplate")
private KafkaTemplate<String, MessageEntity> kafkaTemplate;

public void send(String topic, MessageEntity message) {
    kafkaTemplate.send(topic, message);
}

public void send(String topic, String key, MessageEntity message) {
    ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(topic, key, message);
    long startTime = System.currentTimeMillis();
    ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
    future.addCallback(new ProducerCallback(startTime, key, message));

}

3.4. 消費(fèi)消息

消費(fèi)消息萨醒,通過 @KafkaConsumer 注解即可實(shí)現(xiàn)斟珊。

@KafkaListener(topics = "${kafka.topic.default}", containerFactory = "kafkaListenerContainerFactory")
public void consumer(MessageEntity message){
    log.info("consumer: " + gson.toJson(message));
}

3.5. 測(cè)試

啟動(dòng) SpringBoot 項(xiàng)目,通過提供的 controller 進(jìn)行請(qǐng)求生產(chǎn)消息富纸。

查看日志囤踩,成功記錄消息內(nèi)容,即為生產(chǎn)晓褪、消費(fèi)成功。

到此為止涣仿,Kafka demo 應(yīng)用已完成啦

4. 踩坑記錄

4.1. 打印日志

我在剛開始建好項(xiàng)目勤庐、配置 Kafka 后示惊,啟動(dòng)項(xiàng)目失敗,無日志輸出愉镰,不好排查問題米罚。

設(shè)置日志 level, application.properties 文件配置:

logging.level.root=debug

4.2. kafka 啟動(dòng)內(nèi)存不足

kafka 啟動(dòng) 報(bào)錯(cuò)cannot allocate memory丈探,即內(nèi)存不足

4.3. java client 連接失敗

按照本教程配置录择,已經(jīng)避免了這個(gè)問題。

【kafka】Java連接出現(xiàn)Connection refused: no further information

5. 參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末碗降,一起剝皮案震驚了整個(gè)濱河市隘竭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌遗锣,老刑警劉巖货裹,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異精偿,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)赋兵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門笔咽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人霹期,你說我怎么就攤上這事叶组。” “怎么了历造?”我有些...
    開封第一講書人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵甩十,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我吭产,道長(zhǎng)侣监,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任臣淤,我火速辦了婚禮橄霉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘邑蒋。我一直安慰自己姓蜂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開白布医吊。 她就那樣靜靜地躺著钱慢,像睡著了一般。 火紅的嫁衣襯著肌膚如雪卿堂。 梳的紋絲不亂的頭發(fā)上束莫,一...
    開封第一講書人閱讀 51,365評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼麦箍。 笑死漓藕,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的挟裂。 我是一名探鬼主播享钞,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼诀蓉!你這毒婦竟也來了栗竖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤渠啤,失蹤者是張志新(化名)和其女友劉穎狐肢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沥曹,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡份名,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了妓美。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片僵腺。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖壶栋,靈堂內(nèi)的尸體忽然破棺而出辰如,到底是詐尸還是另有隱情,我是刑警寧澤贵试,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布琉兜,位于F島的核電站,受9級(jí)特大地震影響毙玻,放射性物質(zhì)發(fā)生泄漏豌蟋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一淆珊、第九天 我趴在偏房一處隱蔽的房頂上張望夺饲。 院中可真熱鬧,春花似錦施符、人聲如沸往声。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浩销。三九已至,卻和暖如春听哭,著一層夾襖步出監(jiān)牢的瞬間慢洋,已是汗流浹背塘雳。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留普筹,地道東北人败明。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像太防,于是被迫代替她去往敵國和親妻顶。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 大致可以通過上述情況進(jìn)行排除 1.kafka服務(wù)器問題 查看日志是否有報(bào)錯(cuò)蜒车,網(wǎng)絡(luò)訪問問題等讳嘱。 2. kafka p...
    生活的探路者閱讀 7,589評(píng)論 0 10
  • 一、入門1酿愧、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,348評(píng)論 0 9
  • Apache Kafka 入門 1.kafka簡(jiǎn)介和產(chǎn)生的背景 什么是 Kafka Kafka 是一款分布式消息發(fā)...
    阿粒_lxf閱讀 1,789評(píng)論 0 0
  • 以下內(nèi)容部分翻譯至 http://kafka.apache.org/intro kafka介紹 我們認(rèn)為沥潭,一個(gè)流處...
    若與閱讀 8,763評(píng)論 0 22
  • 2017年的最后一個(gè)夜晚,在王小波的《黃金時(shí)代》里悄悄的穿行嬉挡。 走走停停钝鸽,反反復(fù)復(fù)。無數(shù)個(gè)真實(shí)而又虛幻的畫面映入腦...
    悄然Edward閱讀 199評(píng)論 0 1