kafka 入門詳解

Kafka

Kafka 核心概念

什么是 Kafka

Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái)雾叭,由Scala和Java編寫。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高吞吐、低延遲的平臺(tái)愉棱。其持久化層本質(zhì)上是一個(gè)“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列”,這使它作為企業(yè)級(jí)基礎(chǔ)設(shè)施來處理流式數(shù)據(jù)非常有價(jià)值哲戚。此外奔滑,Kafka可以通過Kafka Connect連接到外部系統(tǒng)(用于數(shù)據(jù)輸入/輸出),并提供了Kafka Streams——一個(gè)Java流式處理庫顺少。該設(shè)計(jì)受事務(wù)日志的影響較大朋其。

基本概念

Kafka是一個(gè)分布式數(shù)據(jù)流平臺(tái)王浴,可以運(yùn)行在單臺(tái)服務(wù)器上,也可以在多臺(tái)服務(wù)器上部署形成集群梅猿。它提供了發(fā)布和訂閱功能氓辣,使用者可以發(fā)送數(shù)據(jù)到Kafka中,也可以從Kafka中讀取數(shù)據(jù)(以便進(jìn)行后續(xù)的處理)袱蚓。Kafka具有高吞吐钞啸、低延遲、高容錯(cuò)等特點(diǎn)喇潘。下面介紹一下Kafka中常用的基本概念:

  • Broker
    消息隊(duì)列中常用的概念体斩,在Kafka中指部署了Kafka實(shí)例的服務(wù)器節(jié)點(diǎn)。

  • Topic
    用來區(qū)分不同類型信息的主題颖低。比如應(yīng)用程序A訂閱了主題t1絮吵,應(yīng)用程序B訂閱了主題t2而沒有訂閱t1,那么發(fā)送到主題t1中的數(shù)據(jù)將只能被應(yīng)用程序A讀到忱屑,而不會(huì)被應(yīng)用程序B讀到蹬敲。

  • Partition
    每個(gè)topic可以有一個(gè)或多個(gè)partition(分區(qū))。分區(qū)是在物理層面上的莺戒,不同的分區(qū)對(duì)應(yīng)著不同的數(shù)據(jù)文件伴嗡。Kafka使用分區(qū)支持物理上的并發(fā)寫入和讀取,從而大大提高了吞吐量从铲。

  • Record
    實(shí)際寫入Kafka中并可以被讀取的消息記錄闹究。每個(gè)record包含了key、value和timestamp食店。

  • Producer
    生產(chǎn)者渣淤,用來向Kafka中發(fā)送數(shù)據(jù)(record)。

  • Consumer
    消費(fèi)者吉嫩,用來讀取Kafka中的數(shù)據(jù)(record)价认。

  • Consumer Group
    一個(gè)消費(fèi)者組可以包含一個(gè)或多個(gè)消費(fèi)者。使用多分區(qū)+多消費(fèi)者方式可以極大提高數(shù)據(jù)下游的處理速度自娩。

kafka 核心名詞解釋

  • Topic(主題): 每一條發(fā)送到kafka集群的消息都可以有一個(gè)類別用踩,這個(gè)類別叫做topic,不同的消息會(huì)進(jìn)行分開存儲(chǔ),如果topic很大忙迁,可以分布到多個(gè)broker上脐彩,也可以這樣理解:topic被認(rèn)為是一個(gè)隊(duì)列,每一條消息都必須指定它的topic姊扔,可以說我們需要明確把消息放入哪一個(gè)隊(duì)列惠奸。對(duì)于傳統(tǒng)的message queue而言,一般會(huì)刪除已經(jīng)被消費(fèi)的消息恰梢,而Kafka集群會(huì)保留所有的消息佛南,無論其被消費(fèi)與否梗掰。當(dāng)然,因?yàn)榇疟P限制嗅回,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒必要)及穗,因此Kafka提供兩種策略刪除舊數(shù)據(jù)。一是基于時(shí)間绵载,二是基于Partition文件大小埂陆。
  • Broker(代理): 一臺(tái)kafka服務(wù)器就可以稱之為broker.一個(gè)集群由多個(gè)broker組成,一個(gè)broker可以有多個(gè)topic

  • Partition(分區(qū)): 為了使得kafka吞吐量線性提高娃豹,物理上把topic分成一個(gè)或者多個(gè)分區(qū)焚虱,每一個(gè)分區(qū)是一個(gè)有序的隊(duì)列。且每一個(gè)分區(qū)在物理上都對(duì)應(yīng)著一個(gè)文件夾培愁,該文件夾下存儲(chǔ)這個(gè)分區(qū)所有消息和索引文件。
    分區(qū)的表示: topic名字-分區(qū)的id每個(gè)日志文件都是一個(gè)Log Entry序列缓窜,每個(gè)Log Entry包含一個(gè)4字節(jié)整型數(shù)值(值為M+5)定续,1個(gè)字節(jié)的"magic value",4個(gè)字節(jié)的CRC校驗(yàn)碼禾锤,然后跟M個(gè)字節(jié)的消息這個(gè)log entries并非由一個(gè)文件構(gòu)成私股,而是分成多個(gè)segment,每個(gè)segment以該segment第一條消息的offset命名并以“.kafka”為后綴恩掷。另外會(huì)有一個(gè)索引文件倡鲸,它標(biāo)明了每個(gè)segment下包含的log entry的offset范圍分區(qū)中每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置黄娘,Kafka只保證一個(gè)分區(qū)的數(shù)據(jù)順序發(fā)送給消費(fèi)者峭状,而不保證整個(gè)topic里多個(gè)分區(qū)之間的順序

  • Replicas(副本): 試想:一旦某一個(gè)Broker宕機(jī),則其上所有的Partition數(shù)據(jù)都不可被消費(fèi)逼争,所以需要對(duì)分區(qū)備份优床。其中一個(gè)宕機(jī)后其它Replica必須要能繼續(xù)服務(wù)并且即不能造成數(shù)據(jù)重復(fù)也不能造成數(shù)據(jù)丟失。
    如果沒有一個(gè)Leader誓焦,所有Replica都可同時(shí)讀/寫數(shù)據(jù)胆敞,那就需要保證多個(gè)Replica之間互相(N×N條通路)同步數(shù)據(jù),數(shù)據(jù)的一致性和有序性非常難保證杂伟,大大增加了Replication實(shí)現(xiàn)的復(fù)雜性移层,同時(shí)也增加了出現(xiàn)異常的幾率。而引入Leader后赫粥,只有Leader負(fù)責(zé)數(shù)據(jù)讀寫观话,F(xiàn)ollower只向Leader順序Fetch數(shù)據(jù)(N條通路),系統(tǒng)更加簡(jiǎn)單且高效越平。
    每一個(gè)分區(qū)匪燕,根據(jù)復(fù)制因子N蕾羊,會(huì)有N個(gè)副本,比如在broker1上有一個(gè)topic帽驯,分區(qū)為topic-1, 復(fù)制因子為2龟再,那么在兩個(gè)broker的數(shù)據(jù)目錄里,就都有一個(gè)topic-1,其中一個(gè)是leader尼变,一個(gè)replicas同一個(gè)Partition可能會(huì)有多個(gè)Replica利凑,而這時(shí)需要在這些Replication之間選出一個(gè)Leader,Producer和Consumer只與這個(gè)Leader交互嫌术,其它Replica作為Follower從Leader中復(fù)制數(shù)據(jù)

  • Producer: Producer將消息發(fā)布到指定的topic中哀澈,同時(shí),producer還需要指定該消息屬于哪個(gè)partition

  • Consumer: 本質(zhì)上kafka只支持topic度气,每一個(gè)consumer屬于一個(gè)consumer group割按,每個(gè)consumer group可以包含多個(gè)consumer。發(fā)送到topic的消息只會(huì)被訂閱該topic的每個(gè)group中的一個(gè)consumer消費(fèi)磷籍。如果所有的consumer都具有相同的group适荣,這種情況和queue很相似,消息將會(huì)在consumer之間均衡分配院领;如果所有的consumer都在不同的group中弛矛,這種情況就是廣播模式,消息會(huì)被發(fā)送到所有訂閱該topic的group中比然,那么所有的consumer都會(huì)消費(fèi)到該消息丈氓。kafka的設(shè)計(jì)原理決定,對(duì)于同一個(gè)topic强法,同一個(gè)group中consumer的數(shù)量不能多于partition的數(shù)量万俗,否則就會(huì)有consumer無法獲取到消息。

  • Offset: Offset專指Partition以及User Group而言饮怯,記錄某個(gè)user group在某個(gè)partiton中當(dāng)前已經(jīng)消費(fèi)到達(dá)的位置该编。

kafka使用場(chǎng)景

目前主流使用場(chǎng)景基本如下:

  • 消息隊(duì)列(MQ)
    在系統(tǒng)架構(gòu)設(shè)計(jì)中,經(jīng)常會(huì)使用消息隊(duì)列(Message Queue)——MQ硕淑。MQ是一種跨進(jìn)程的通信機(jī)制课竣,用于上下游的消息傳遞,使用MQ可以使上下游解耦置媳,消息發(fā)送上游只需要依賴MQ于樟,邏輯上和物理上都不需要依賴其他下游服務(wù)。MQ的常見使用場(chǎng)景如流量削峰拇囊、數(shù)據(jù)驅(qū)動(dòng)的任務(wù)依賴等等迂曲。在MQ領(lǐng)域,除了Kafka外還有傳統(tǒng)的消息隊(duì)列如ActiveMQ和RabbitMQ等寥袭。

  • 追蹤網(wǎng)站活動(dòng)
    Kafka最出就是被設(shè)計(jì)用來進(jìn)行網(wǎng)站活動(dòng)(比如PV路捧、UV关霸、搜索記錄等)的追蹤〗苌ǎ可以將不同的活動(dòng)放入不同的主題队寇,供后續(xù)的實(shí)時(shí)計(jì)算、實(shí)時(shí)監(jiān)控等程序使用章姓,也可以將數(shù)據(jù)導(dǎo)入到數(shù)據(jù)倉庫中進(jìn)行后續(xù)的離線處理和生成報(bào)表等佳遣。

  • Metrics
    Kafka經(jīng)常被用來傳輸監(jiān)控?cái)?shù)據(jù)。主要用來聚合分布式應(yīng)用程序的統(tǒng)計(jì)數(shù)據(jù)凡伊,將數(shù)據(jù)集中后進(jìn)行統(tǒng)一的分析和展示等零渐。

  • 日志聚合
    很多人使用Kafka作為日志聚合的解決方案。日志聚合通常指將不同服務(wù)器上的日志收集起來并放入一個(gè)日志中心系忙,比如一臺(tái)文件服務(wù)器或者HDFS中的一個(gè)目錄诵盼,供后續(xù)進(jìn)行分析處理。相比于Flume和Scribe等日志聚合工具银还,Kafka具有更出色的性能风宁。

kafka 集群搭建

安裝kefka集群

由于kafka依賴zookeeper環(huán)境所以先安裝zookeeper,zk安裝

安裝環(huán)境


linux: CentSO-7.5_x64
java: jdk1.8.0_191
zookeeper: zookeeper3.4.10
kafka: kafka_2.11-2.0.1

# 下載
$ wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

# 解壓
$ tar -zxvf kafka_2.11-2.1.0.tgz

# 編輯配置文件修改一下幾個(gè)配置
$ vim $KAFKA_HOME/config/server.properties

# 每臺(tái)服務(wù)器的broker.id都不能相同只能是數(shù)字
broker.id=1

# 修改為你的服務(wù)器的ip或主機(jī)名
advertised.listeners=PLAINTEXT://node-1:9092

# 設(shè)置zookeeper的連接端口,將下面的ip修改為你的IP稱或主機(jī)名
zookeeper.connect=node-1:2181,node-2:2181,node-3:2181

啟動(dòng)Kafka集群并測(cè)試

$ cd $KAFKA_HOME

# 分別在每個(gè)節(jié)點(diǎn)啟動(dòng)kafka服務(wù)(-daemon表示在后臺(tái)運(yùn)行)
$ bin/kafka-server-start.sh -daemon config/server.properties

# 創(chuàng)建一個(gè)名詞為 test-topic 的 Topic见剩,partitions 表示分區(qū)數(shù)量為3 --replication-factor 表示副本數(shù)量為2
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test-topic

# 查看topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

# 查看topic狀態(tài)
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic

# 查看topic詳細(xì)信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic

# 修改topic信息
$ bin/kafka-topics.sh --alter --topic test-topic --zookeeper localhost:2181 --partitions 5

# 刪除topic(簡(jiǎn)單的刪除杀糯,只是標(biāo)記刪除)
$ bin/kafka-topics.sh --delete --topic test-topic --zookeeper localhost:2181

# 在一臺(tái)服務(wù)器上創(chuàng)建一個(gè) producer (生產(chǎn)者)
$ bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092 --topic test-topic

# 在一臺(tái)服務(wù)器上創(chuàng)建一個(gè) consumer (消費(fèi)者)
$ bin/kafka-console-consumer.sh --bootstrap-server node-2:9092,node-3:9092,node-4:9092 --topic test-topic --from-beginning

# 現(xiàn)在可以在生產(chǎn)者的控制臺(tái)輸入任意字符就可以看到消費(fèi)者端有消費(fèi)消息扫俺。

java 客戶端連接kafka

普通java形式

  • pom.xml
<dependencies>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>
  • JavaKafkaConsumer.java 消費(fèi)者

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaConsumer {

    private static Logger logger = LoggerFactory.getLogger(JavaKafkaConsumer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties;

    static {
        properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
    }

    public static void main(String[] args) {

        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                // 將偏移設(shè)置到最開始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                logger.info("offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value());
            }
        }
    }

}
  • JavaKafkaProducer.java 生產(chǎn)者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.UUID;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaProducer {

    private static Logger logger = LoggerFactory.getLogger(JavaKafkaProducer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties;

    static {
        properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
    }

    public static void main(String[] args) {

        Producer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 200; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String uuid = UUID.randomUUID().toString();
            producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), uuid));
            logger.info("send message success key: {}, value: {}", i, uuid);
        }
        producer.close();
    }

}

  • KafkaClient.java

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

import java.util.*;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class KafkaClient {

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties = new Properties();

    static {
        properties.put("bootstrap.servers", KAFKA_BROKER);
    }

    /**
     * 創(chuàng)建topic
     */
    @Test
    public void createTopic() {
        AdminClient adminClient = AdminClient.create(properties);
        List<NewTopic> newTopics = Arrays.asList(new NewTopic(TOPIC, 1, (short) 1));
        CreateTopicsResult result = adminClient.createTopics(newTopics);
        try {
            result.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 創(chuàng)建topic
     */
    @Test
    public void create() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        // 創(chuàng)建一個(gè)3個(gè)分區(qū)2個(gè)副本名為t1的topic
        AdminUtils.createTopic(zkUtils, "t1", 3, 2, new Properties(), RackAwareMode.Enforced$.MODULE$);
        zkUtils.close();
    }

    /**
     * 查詢topic
     */
    @Test
    public void listTopic() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        // 獲取 topic 所有屬性
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "streaming-topic");

        Iterator it = props.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            System.err.println(entry.getKey() + " = " + entry.getValue());
        }
        zkUtils.close();
    }

    /**
     * 修改topic
     */
    @Test
    public void updateTopic() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "log-test");
        // 增加topic級(jí)別屬性
        props.put("min.cleanable.dirty.ratio", "0.4");
        // 刪除topic級(jí)別屬性
        props.remove("max.message.bytes");
        // 修改topic 'test'的屬性
        AdminUtils.changeTopicConfig(zkUtils, "log-test", props);
        zkUtils.close();

    }

    /**
     * 刪除topic 't1'
     */
    @Test
    public void deleteTopic() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        AdminUtils.deleteTopic(zkUtils, "t1");
        zkUtils.close();
    }


}

  • log4j.properties 日志配置
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

基于spirngboot整合kafka

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <artifactId>spring-boot-kafka</artifactId>
    <groupId>com.andy</groupId>
    <version>1.0.7.RELEASE</version>
    
    <packaging>jar</packaging>
    <modelVersion>4.0.0</modelVersion>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.spring.platform</groupId>
                <artifactId>platform-bom</artifactId>
                <version>Cairo-SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.3.RELEASE</version>
                <configuration>
                    <!--<mainClass>${start-class}</mainClass>-->
                    <layout>ZIP</layout>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • application.yml
spring:
  application:
    name: spring-jms
  kafka:
    bootstrap-servers: node-2:9092,node-3:9092,node-4:9092
    producer:
      retries:
      batch-size: 16384
      buffer-memory: 33554432
      compressionType: snappy
      acks: all
    consumer:
      group-id: 0
      auto-offset-reset: earliest
      enable-auto-commit: true
  • Message.java 消息

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@ToString
public class Message<T> {

    private Long id;

    private T message;

    private Date time;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public T getMessage() {
        return message;
    }

    public void setMessage(T message) {
        this.message = message;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }
}
  • KafkaController.java 控制器
import com.andy.jms.kafka.service.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/kafka/{topic}")
    public String send(@PathVariable("topic") String topic, @RequestParam String message) {
        kafkaSender.send(topic, message);
        return "success";
    }

}
  • KafkaReceiver.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaReceiver {


    @KafkaListener(topics = {"order"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("record:{}", record);
            log.info("message:{}", message);
        }
    }
}
  • KafkaSender.java
import com.andy.jms.kafka.commen.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     *
     * @param topic
     * @param body
     */
    public void send(String topic, Object body) {
        Message<String> message = new Message<>();
        message.setId(System.currentTimeMillis());
        message.setMessage(body.toString());
        message.setTime(new Date());
        String content = null;
        try {
            content = objectMapper.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        kafkaTemplate.send(topic, content);
        log.info("send {} to {} success!", message, topic);
    }
}

  • 啟動(dòng)類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Leone
 * @since 2018-04-10
 **/
@SpringBootApplication
public class JmsApplication {
    public static void main(String[] args) {
        SpringApplication.run(JmsApplication.class, args);
    }
}

github地址

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末苍苞,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子狼纬,更是在濱河造成了極大的恐慌羹呵,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疗琉,死亡現(xiàn)場(chǎng)離奇詭異冈欢,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)盈简,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門凑耻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人柠贤,你說我怎么就攤上這事香浩。” “怎么了臼勉?”我有些...
    開封第一講書人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵邻吭,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我宴霸,道長(zhǎng)囱晴,這世上最難降的妖魔是什么膏蚓? 我笑而不...
    開封第一講書人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮畸写,結(jié)果婚禮上驮瞧,老公的妹妹穿的比我還像新娘。我一直安慰自己艺糜,他們只是感情好剧董,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著破停,像睡著了一般翅楼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上真慢,一...
    開封第一講書人閱讀 51,698評(píng)論 1 305
  • 那天毅臊,我揣著相機(jī)與錄音,去河邊找鬼黑界。 笑死管嬉,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的朗鸠。 我是一名探鬼主播蚯撩,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼烛占!你這毒婦竟也來了胎挎?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤忆家,失蹤者是張志新(化名)和其女友劉穎犹菇,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芽卿,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡揭芍,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了卸例。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片称杨。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖筷转,靈堂內(nèi)的尸體忽然破棺而出姑原,到底是詐尸還是另有隱情,我是刑警寧澤旦装,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布页衙,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏店乐。R本人自食惡果不足惜艰躺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望眨八。 院中可真熱鬧腺兴,春花似錦、人聲如沸廉侧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽段誊。三九已至闰蚕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間连舍,已是汗流浹背没陡。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留索赏,地道東北人盼玄。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像潜腻,于是被迫代替她去往敵國(guó)和親埃儿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容