kafka的配置和使用

安裝
需要在機(jī)器上配置jdk環(huán)境
下載kafka對應(yīng)版本wget wget http://mirror.bit.edu.cn/apache/kafka/0.11.0.2/kafka_2.12-0.11.0.2.tgz
解壓后可以看到目錄

image.png

bin:包含Kafka運(yùn)行的所有腳本蔫骂,如:start/stop Zookeeper西疤,start/stop Kafka
libs:Kafka運(yùn)行的依賴庫
config:zookeeper兢交,Logger,Kafka等相關(guān)配置文件
sit-docs:Kafka相關(guān)文檔

kafka的配置方式
單節(jié)點(diǎn)-單Broker集群:只在一個(gè)節(jié)點(diǎn)上部署一個(gè)Broker
單節(jié)點(diǎn)-多Broker集群:在一個(gè)節(jié)點(diǎn)上部署多個(gè)Broker,只不過各個(gè)Broker以不同的端口啟動(dòng)
多節(jié)點(diǎn)-多Broker集群:以上兩種的組合,每個(gè)節(jié)點(diǎn)上部署一到多個(gè)Broker,且各個(gè)節(jié)點(diǎn)連接起來

這里選擇使用kafka自帶zookeeper來存儲(chǔ)集群元數(shù)據(jù)和Consumer信息。
也可以獨(dú)立部署來進(jìn)行存儲(chǔ)匆赃。
啟動(dòng)
第一步啟動(dòng)zookeeper服務(wù)


image.png


今缚。

image.png

啟動(dòng)成功2181端口就是zookeeper端口
可以通過修改config/zookeeper.properties 文件進(jìn)行修改
第二部啟動(dòng)kafka服務(wù)


image.png

使用kafka
通過命令新建topic


image.png

在當(dāng)前節(jié)點(diǎn)上新建一個(gè)名稱為topic1的topic
校驗(yàn)topic是否創(chuàng)建成功


image.png

topic已經(jīng)創(chuàng)建成功算柳,可以使用了。
Producer發(fā)送消息hello word!
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

hello world!
Consummer接受消息


image.png

下面開始編碼實(shí)現(xiàn)功能姓言。
客戶端使用lib包在服務(wù)器安裝libs目錄下


image.png

本機(jī)外調(diào)用需要修改server.properties文件
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.33:9092
zookeeper.connect=192.168.1.33:2181

代碼分:配置埠居、生產(chǎn)者查牌、消費(fèi)者、調(diào)用main方法4部分組成
配置文件
package com.main;

public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "192.168.1.33";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_CONSUMER_PORT=2181;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 1000;
public static final String CLIENT_ID = "SimpleConsumerDemoClient";

private KafkaProperties() {}

}
生產(chǎn)者
package com.main;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer extends Thread{
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;

public Producer(String topic, Boolean isAsync) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producer = new KafkaProducer<Integer, String>(props);
    this.topic = topic;
    this.isAsync = isAsync;
}

public void run() {
    int messageNo = 1;
    while (true) {
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        if (isAsync) { // Send asynchronously
            producer.send(new ProducerRecord<Integer, String>(topic,
                messageNo,
                messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            if(messageNo==100){
                break;
            }
        } else { // Send synchronously
            try {
                producer.send(new ProducerRecord<Integer, String>(topic,
                    messageNo,
                    messageStr)).get();
                System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
            } catch (ExecutionException e) {
                e.printStackTrace();
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ++messageNo;
    }
}

}
class DemoCallBack implements Callback {

private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
    this.startTime = startTime;
    this.key = key;
    this.message = message;
}

/**
 * A callback method the user can implement to provide asynchronous handling of request completion. This method will
 * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
 * non-null.
 *
 * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
 *                  occurred.
 * @param exception The exception thrown during processing of this record. Null if no error occurred.
 */
public void onCompletion(RecordMetadata metadata, Exception exception) {
    long elapsedTime = System.currentTimeMillis() - startTime;
    if (metadata != null) {
        System.out.println(
            "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                "), " +
                "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
    } else {
        exception.printStackTrace();
    }
}

}
消費(fèi)者
package com.main;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import kafka.utils.ShutdownableThread;

public class Consumer extends ShutdownableThread{
private final KafkaConsumer<Integer, String> consumer;
private final String topic;

public Consumer(String topic) {
    super("KafkaConsumerExample", false);
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<Integer,String>(props);
    this.topic = topic;
}

@Override
public void doWork() {
    System.out.println("doWork");
    consumer.subscribe(Collections.singletonList(this.topic));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    for (ConsumerRecord<Integer, String> record : records) {
        System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }
}

@Override
public String name() {
    return null;
}

@Override
public boolean isInterruptible() {
    return false;
}

}
調(diào)用方法
package com.main;

public class KafkaConsumerProducerDemo {
public static void main(String[] args) {
boolean isAsync = true;
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
producerThread.start();

    Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
    consumerThread.start();

}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末滥壕,一起剝皮案震驚了整個(gè)濱河市纸颜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌绎橘,老刑警劉巖胁孙,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異称鳞,居然都是意外死亡涮较,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門冈止,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狂票,“玉大人,你說我怎么就攤上這事熙暴」胧簦” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵周霉,是天一觀的道長掂器。 經(jīng)常有香客問我,道長俱箱,這世上最難降的妖魔是什么国瓮? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮狞谱,結(jié)果婚禮上乃摹,老公的妹妹穿的比我還像新娘。我一直安慰自己跟衅,他們只是感情好峡懈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著与斤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪荚恶。 梳的紋絲不亂的頭發(fā)上撩穿,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天,我揣著相機(jī)與錄音谒撼,去河邊找鬼食寡。 笑死,一個(gè)胖子當(dāng)著我的面吹牛廓潜,可吹牛的內(nèi)容都是我干的抵皱。 我是一名探鬼主播善榛,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼呻畸!你這毒婦竟也來了移盆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤伤为,失蹤者是張志新(化名)和其女友劉穎咒循,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绞愚,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡叙甸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了位衩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片裆蒸。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖糖驴,靈堂內(nèi)的尸體忽然破棺而出僚祷,到底是詐尸還是另有隱情,我是刑警寧澤遂赠,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布久妆,位于F島的核電站,受9級特大地震影響跷睦,放射性物質(zhì)發(fā)生泄漏筷弦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一抑诸、第九天 我趴在偏房一處隱蔽的房頂上張望烂琴。 院中可真熱鬧,春花似錦蜕乡、人聲如沸奸绷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽号醉。三九已至,卻和暖如春辛块,著一層夾襖步出監(jiān)牢的瞬間畔派,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工润绵, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留线椰,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓尘盼,卻偏偏與公主長得像憨愉,于是被迫代替她去往敵國和親烦绳。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理配紫,服務(wù)發(fā)現(xiàn)径密,斷路器,智...
    卡卡羅2017閱讀 134,662評論 18 139
  • 大致可以通過上述情況進(jìn)行排除 1.kafka服務(wù)器問題 查看日志是否有報(bào)錯(cuò)笨蚁,網(wǎng)絡(luò)訪問問題等睹晒。 2. kafka p...
    生活的探路者閱讀 7,589評論 0 10
  • Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)括细。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,083評論 0 43
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,833評論 4 54
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,469評論 0 34