kafka-第一章-初步認(rèn)識(shí)kafka

學(xué)習(xí)大綱


學(xué)習(xí)大綱

一指煎、kafka介紹

Kafka最初是由Linkedln公司采用Scala語(yǔ)言開(kāi)發(fā)的一個(gè)多分區(qū)粗蔚、多副本并且基于ZooKeeper協(xié)調(diào)的分布式消息系統(tǒng)尝偎,現(xiàn)在已經(jīng)捐獻(xiàn)給了Apache基金會(huì)。目前Kafka已經(jīng)定位為一個(gè)分布式流式處理平臺(tái)鹏控,它以高吞吐致扯、可持久化、可水平擴(kuò)展当辐、支持流處理等多種特性而被廣泛應(yīng)用抖僵。

Apache Kafka是一個(gè)分布式的發(fā)布-訂閱消息系統(tǒng),能夠支撐海量數(shù)據(jù)的數(shù)據(jù)傳遞缘揪。在離線和實(shí)時(shí)的消息處理業(yè)務(wù)系統(tǒng)中耍群,Kafka都有廣泛的應(yīng)用义桂。Kafka將消息持久化到磁盤(pán)中,并對(duì)消息創(chuàng)建了備份保證了數(shù)據(jù)的安全蹈垢。Kafka在保證了較高的處理速度的同時(shí)慷吊,又能保證數(shù)據(jù)處理的低延遲和數(shù)據(jù)的零丟失。

Kafka官網(wǎng)主頁(yè)
Kafka官方文檔

1曹抬、特性

(1)高吞吐量溉瓶、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒沐祷,每個(gè)主題可以分多個(gè)分區(qū),消費(fèi)組對(duì)分區(qū)進(jìn)行消費(fèi)操作;
(2)可擴(kuò)展性:kafka集群支持熱擴(kuò)展嚷闭,
(3)持久性攒岛、可靠性:消息被持久化到本地磁盤(pán)赖临,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失;
(4)容錯(cuò)性:允許集群中節(jié)點(diǎn)失斣志狻(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失斁ふァ)
(5)高并發(fā):支持?jǐn)?shù)千個(gè)客戶(hù)端同時(shí)讀寫(xiě);

2、使用場(chǎng)景

(1)日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log顺饮,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開(kāi)放給各種consumer吵聪,例如Hadoop、Hbase兼雄、Solr等;
(2)消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者吟逝、緩存消息等;
(3)用戶(hù)活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶(hù)或者app用戶(hù)的各種活動(dòng),如瀏覽網(wǎng)頁(yè)赦肋、搜索块攒、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中佃乘,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析囱井,或者裝載到Hadoop、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘;
(4)運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)趣避。包括收集各種分布式應(yīng)用的數(shù)據(jù)庞呕,生產(chǎn)各種操作的集中反
饋,比如報(bào)警和報(bào)告﹔
(5)流式處理:比如spark streaming和storm;

3程帕、技術(shù)優(yōu)勢(shì)
  • 可伸縮性:Kafka的兩個(gè)重要特性造就了它的可伸縮性住练。
    1、Kafka集群在運(yùn)行期間可以輕松地?cái)U(kuò)展或收縮(可以添加或刪除代理)愁拭,而不會(huì)宕機(jī)澎羞。
    2、可以擴(kuò)展一個(gè)Kafka主題來(lái)包含更多的分區(qū)敛苇。由于一個(gè)分區(qū)無(wú)法擴(kuò)展到多個(gè)代理妆绞,所以它的容量受到代理磁盤(pán)空間的限制顺呕。能夠增加分區(qū)和代理的數(shù)量意味著單個(gè)主題可以存儲(chǔ)的數(shù)據(jù)量是沒(méi)有限制的。
  • 容錯(cuò)性和可靠性:
    Kafka的設(shè)計(jì)方式使某個(gè)代理的故障能夠被集群中的其他代理檢測(cè)到括饶。由于每個(gè)主題都可以在多個(gè)代理上復(fù)制株茶,所以集群可以在不中斷服務(wù)的情況下從此類(lèi)故障中恢復(fù)并繼續(xù)運(yùn)行。
  • 吞吐量
    代理能夠以超快的速度有效地存儲(chǔ)和檢索數(shù)據(jù)图焰。

二启盛、概念詳解

概念詳解
1、Producer

生產(chǎn)者即數(shù)據(jù)的發(fā)布者技羔,該角色將消息發(fā)布到Kafka的topic中僵闯。broker接收到生產(chǎn)者發(fā)送的消息后,broker將該消息追加到當(dāng)前用于追加數(shù)據(jù)的segment文件中藤滥。生產(chǎn)者發(fā)送的消息鳖粟,存儲(chǔ)到一個(gè)partition中,生產(chǎn)者也可以指定數(shù)據(jù)存儲(chǔ)的partition拙绊。

2向图、Consumer

消費(fèi)者可以從broker中讀取數(shù)據(jù)。消費(fèi)者可以消費(fèi)多個(gè)topic中的數(shù)據(jù)标沪。

3榄攀、Topic

在Kafka中,使用一個(gè)類(lèi)別屬性來(lái)劃分?jǐn)?shù)據(jù)的所屬類(lèi)金句,劃分?jǐn)?shù)據(jù)的這個(gè)類(lèi)稱(chēng)為topic檩赢。如果把Kafka看做為一個(gè)數(shù)據(jù)庫(kù),topic可以理解為數(shù)據(jù)庫(kù)中的一張表违寞,topic的名字即為表名贞瞒。

4、Partition

topic中的數(shù)據(jù)分割為一個(gè)或多個(gè)partition坞靶。每個(gè)topic至少有一partition憔狞。每個(gè)partition中的數(shù)據(jù)使用多個(gè)segment文件存儲(chǔ)。partition中的數(shù)據(jù)是有序的彰阴,partition間的數(shù)據(jù)丟失了數(shù)據(jù)的順序瘾敢。如果topic有多個(gè)partition,消費(fèi)數(shù)據(jù)時(shí)就不能保證數(shù)據(jù)的順序尿这。在需要嚴(yán)格保證消息的消費(fèi)順序的場(chǎng)景下簇抵,需要將partition數(shù)目設(shè)為1。

5射众、Partition offset

每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset碟摆,它指明了這條消息的起始位置。

6叨橱、Replicas of partition

副本是一個(gè)分區(qū)的備份典蜕。副本不會(huì)被消費(fèi)者消費(fèi)断盛,副本只用于防止數(shù)據(jù)丟失,即消費(fèi)者不從為follower的partition中消費(fèi)數(shù)據(jù)愉舔,而是從為leader的partition中讀取數(shù)據(jù)钢猛。副本之間是一主多從的關(guān)系。

7轩缤、Broker

Kafka集群包含一個(gè)或多個(gè)服務(wù)器命迈,服務(wù)器節(jié)點(diǎn)稱(chēng)為broker。broker存儲(chǔ)topic的數(shù)據(jù)火的。如果某topic有N個(gè)partition壶愤,集群有N個(gè)broker,那么每個(gè)broker存儲(chǔ)該topic的一個(gè)partition馏鹤。如果某topic有N個(gè)partition征椒,集群有(N+M)個(gè)broker,那么其中有N個(gè)broker存儲(chǔ)該topic的一個(gè)partition假瞬,剩下的M個(gè)broker不存儲(chǔ)該topic的partition數(shù)據(jù)陕靠。如果某topic有N個(gè)partition迂尝,集群中broker數(shù)目少于N個(gè)脱茉,那么一個(gè)broker存儲(chǔ)該topic的一個(gè)或多個(gè)partition。在實(shí)際生產(chǎn)環(huán)境中垄开,盡量避免這種情況的發(fā)生琴许,這種情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。

8溉躲、Leader

每個(gè)partition有多個(gè)副本榜田,其中有且僅有一個(gè)作為L(zhǎng)eader,Leader是當(dāng)前負(fù)責(zé)數(shù)據(jù)的讀寫(xiě)的partition锻梳。

9箭券、Follower

Follower跟隨Leader,所有寫(xiě)請(qǐng)求都通過(guò)Leader路由疑枯,數(shù)據(jù)變更會(huì)廣播給所有Follower辩块,F(xiàn)ollower與Leader保持?jǐn)?shù)據(jù)同步。如果Leader失效荆永,則從Follower中選舉出一個(gè)新的Leader废亭。當(dāng)Follower與Leader掛掉、卡住或者同步太慢具钥,leader會(huì)把這個(gè)follower從"in sync replicas”(ISR)列表中刪除豆村,重新創(chuàng)建一個(gè)Follower。

10骂删、zookeeper

Zookeeper負(fù)責(zé)維護(hù)和協(xié)調(diào)broker掌动。當(dāng)Kafka系統(tǒng)中新增了broker或者某個(gè)broker發(fā)生故障失效時(shí)四啰,由ZooKeeper通知生產(chǎn)者和消費(fèi)者。生產(chǎn)者和消費(fèi)者依據(jù)Zookeeper的broker狀態(tài)信息與broker協(xié)調(diào)數(shù)據(jù)的發(fā)布和訂閱任務(wù)粗恢。

11拟逮、AR(Assigned Replicas)

分區(qū)中所有的副本統(tǒng)稱(chēng)為AR。

12适滓、ISR(In-Sync Replicas)

所有與Leader部分保持一定程度的副(包括Leader副本在內(nèi))本組成ISR敦迄。

13、OSR(Out-of-Sync-Replicas)

與Leader副本同步滯后過(guò)多的副本凭迹。

14罚屋、HW(High Watermark)

高水位,標(biāo)識(shí)了一個(gè)特定的offset嗅绸,消費(fèi)者只能拉取到這個(gè)offset之前的消息脾猛。

15、LEO(Log End Offset)

即日志末端位移(log end offset)鱼鸠,記錄了該副本底層日志(log)中下一條消息的位移值猛拴。注意是下一條消息!也就是說(shuō),如果LEO=10蚀狰,那么表示該副本保存了10條消息愉昆,位移值范圍是[0,9]麻蹋。


LEO

三跛溉、安裝與配置

1、使用Kafka需求有以下前置條件
  • Linux系統(tǒng)/windows系統(tǒng)(博主選的是windows)
  • java環(huán)境
  • zookeeper
    zookeeper下載地址
    zookeeper下載

注意著兩個(gè)文件都要下載扮授,并把a(bǔ)pache-zookeeper-3.5.8-bin.tar包里面的lib放到apache-zookeeper-3.5.8.tar.gz里面芳室,不然在運(yùn)行zookeeper的時(shí)候如果閃退。如果閃退可以在配置文件上寫(xiě)上pause刹勃,這樣方便查看錯(cuò)誤

錯(cuò)誤: 找不到或無(wú)法加載主類(lèi) org.apache.zookeeper.server.quorum.QuorumPeerMain
image.png

image.png
  • Kafka環(huán)境安裝
    Kafka下載地址
    下載鏈接

    將下載文件解壓后運(yùn)行如下命令,即可啟動(dòng)成功
.\bin\windows\kafka-server-start.bat .\config\server.properties
2堪侯、Kafka測(cè)試消息生產(chǎn)與消費(fèi)
  • 創(chuàng)建主題
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic haijia--partitions 2--replication-factor 1

--zookeeper:指定了Kafka所連接的Zookeeper服務(wù)地址
--create:創(chuàng)建主題的動(dòng)作指令
--topic:指定了所要?jiǎng)?chuàng)建主題的名稱(chēng)
--partitions:指定了分區(qū)個(gè)數(shù)
--replication-factor:指定了副本因子

  • 展示所有的主題
.bin/kafka-topics.sh --zookeeper localhost:2181 --list
  • 查看主題詳情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic haijia
  • 刪除主題
bin/kafka-topics.sh --zookeeper localhost:2181  --delete --topic haijia

需要 server.properties 中設(shè)置 delete.topic.enable=true 否則只是標(biāo)記刪除。
若delete.topic.enable=true荔仁,直接徹底刪除該Topic伍宦。
若delete.topic.enable=false ,如果當(dāng)前Topic沒(méi)有使用過(guò)即沒(méi)有傳輸過(guò)信息:可以徹底刪除咕晋。如果當(dāng)前Topic有使用過(guò)即有過(guò)傳輸過(guò)信息雹拄,并沒(méi)有真正刪除Topic只是把這個(gè)Topic標(biāo)記為刪除(marked for deletion),重啟Kafka Server后刪除掌呜。

  • 增加分區(qū)
bin/kafka-topics.sh --zookeeper localhost:2181  --alter --topic haijia   --partitions 6

修改分區(qū)數(shù)時(shí)滓玖,僅能增加分區(qū)個(gè)數(shù)。若是用其減少 partition個(gè)數(shù)质蕉,則會(huì)報(bào)錯(cuò)誤信息

  • 啟動(dòng)消費(fèi)段接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic haijia

--bootstrap-server 指定了連接Kafka集群的地址
-topic 指定了消費(fèi)端訂閱的主題

  • 生產(chǎn)端發(fā)送消息
bin/kafka-console-pfoducer.sh --broker-list localhost:9092--topic haijia

--broker-list 指定了連接的Kafka集群的地址
--topic指定了發(fā)送消息時(shí)的主題

四势篡、Java第一個(gè)程序

1翩肌、快速啟動(dòng)環(huán)境
D:\Software\apacheZookeeper3.5.8\apache-zookeeper-3.5.8\bin>zkServer.cmd

D:\Software\kafka_2.13-2.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties
1、新建項(xiàng)目
springweb

kafka

也可以在pom.xml引入依賴(lài)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.haijia</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.0.RELEASE</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.0</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.0.RELEASE</version>
                <configuration>
                    <mainClass>com.haijia.kafka.KafkaApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2禁悠、application.yml中引入kafka相關(guān)配置
spring:
  kafka:
    #如果是集群念祭,用 ,分隔碍侦,如:112.126.74.249:9092,112.126.74.249:9093,172.101.203.33:9092
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 發(fā)生錯(cuò)誤后粱坤,消息重發(fā)的次數(shù)。
      retries: 0
      #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí)瓷产,生產(chǎn)者會(huì)把它們放在同一個(gè)批次里站玄。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算濒旦。
      batch-size: 16384
      # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小株旷。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 應(yīng)答級(jí)別:
      # acks=0 : 生產(chǎn)者在成功寫(xiě)入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)。
      # acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息尔邓,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器成功響應(yīng)晾剖。
      # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)梯嗽。
      acks: 1
      #properties:
        # 自定義分區(qū)器
      #  partitioner-class: com.felix.kafka.producer.CustomizePartitioner
        # 提交延時(shí):當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會(huì)將消息提交給kafka,
        #    linger.ms為0表示每接收到一條消息就提交給kafka,這時(shí)候batch-size其實(shí)就沒(méi)用了
      #  linger-ms: 0

    consumer:
      # 自動(dòng)提交的時(shí)間間隔 在spring boot 2.X 版本中這里采用的是值的類(lèi)型為Duration 需要符合特定的格式齿尽,如1S,1M,2H,5D
      auto-commit-interval: 1S

      #  spring.kafka.consumer.enable-auto-commit=true->是否自動(dòng)提交offset
      # spring.kafka.consumer.auto.commit.interval.ms=1000->提交offset延時(shí)(接收到消息后多久提交offset)
      # spring.kafka.consumer.properties.session.timeout.ms=120000->消費(fèi)會(huì)話超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間consumer沒(méi)有發(fā)送心跳,就會(huì)觸發(fā)rebalance操作)
      # spring.kafka.consumer.properties.request.timeout.ms=18000->消費(fèi)請(qǐng)求超時(shí)時(shí)間

      # 該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下該作何處理:
      # latest(默認(rèn)值)在偏移量無(wú)效的情況下,消費(fèi)者將從最新的記錄開(kāi)始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
      # earliest :在偏移量無(wú)效的情況下慷荔,消費(fèi)者將從起始位置讀取分區(qū)的記錄
      auto-offset-reset: earliest
      # 是否自動(dòng)提交偏移量雕什,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失缠俺,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽(tīng)器容器中運(yùn)行的線程數(shù)显晶。
      concurrency: 5
      #listner負(fù)責(zé)ack,每調(diào)用一次壹士,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

swagger:
  enabled: true
3磷雇、producer
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
@Slf4j
public class KafkaProducer {

    //自定義topic
    public static final String TOPIC_TEST = "topic.test";
    
    public static final String TOPIC_GROUP1 = "topic.group1";
    
    public static final String TOPIC_GROUP2 = "topic.group2";
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("準(zhǔn)備發(fā)送消息為:{}", obj2String);
        //發(fā)送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //發(fā)送失敗的處理
                log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的處理
                log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}
4躏救、Consumer
package com.haijia.kafka.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info(KafkaProducer.TOPIC_GROUP1+"KafkaConsumer 接收到消息");
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消費(fèi)了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
    public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info(KafkaProducer.TOPIC_GROUP2+"KafkaConsumer 接收到消息");
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test1 消費(fèi)了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
}

5唯笙、Controller
import com.haijia.kafka.kafka.KafkaProducer;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;

import java.util.Map;

@RequestMapping("/kafka")
@Controller
@Slf4j
@Api(value = "SwaggerValue", tags = "KafkaController", description = "測(cè)試接口相關(guān)")//,produces = Media
//Api【tags=“說(shuō)明該類(lèi)的作用,可以在UI界面上看到的注解”】
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;


    @GetMapping(value = "/send")
    @ResponseBody
    @Transactional(rollbackFor = Exception.class)
    @ApiOperation(value = "kafka發(fā)送消息", notes = "發(fā)送消息")
    //ApiOperation【value=“說(shuō)明方法的用途盒使、作用”;notes=“方法的備注說(shuō)明】
    @ApiImplicitParam(name = "id", value = "用戶(hù)ID", required = false, dataType = "Long", paramType = "query")
    //ApiImplicitParam【name:參數(shù)名崩掘;value:參數(shù)的漢字說(shuō)明、解釋?zhuān)籨ataType: 參數(shù)類(lèi)型少办,默認(rèn)String苞慢;required : 參數(shù)是否必傳,true必傳
    //   defaultValue:參數(shù)的默認(rèn)值英妓;paramType:參數(shù)放在哪個(gè)地方挽放;header請(qǐng)求參數(shù)的獲取:@RequestHeader,參數(shù)從請(qǐng)求頭獲取
    //   query請(qǐng)求參數(shù)的獲取:@RequestParam,參數(shù)從地址欄問(wèn)號(hào)后面的參數(shù)獲壬苋;path(用于restful接口)請(qǐng)求參數(shù)的獲燃琛:@PathVariable吗蚌,參數(shù)從URL地址上獲取
    //   body(不常用)參數(shù)從請(qǐng)求體中獲取纯出;form(不常用)參數(shù)從form表單中獲取】
    //@ApiIgnore
    //ApiIgnore: 使用該注解忽略這個(gè)API蚯妇,不會(huì)生成接口文檔≡蒹荩可注解在類(lèi)和方法上
    //@ApiResponse(code = 400,message = "參數(shù)錯(cuò)誤",response = "拋出異常的類(lèi)")
    //ApiResponse:響應(yīng)介紹
    //@ApiResponses(一組響應(yīng))
    //@ApiModel(用在返回對(duì)象類(lèi)上侮措,描述一個(gè)Model的信息(這種一般用在post創(chuàng)建的時(shí)候,使用@RequestBody這樣的場(chǎng)景乖杠,請(qǐng)求參數(shù)無(wú)法使用@ApiImplicitParam注解進(jìn)行描述的時(shí)候)
    //@ApiModelProperty[value = 字段說(shuō)明,name = 重寫(xiě)屬性名字;dataType = 重寫(xiě)屬性類(lèi)型;required = 是否必填分扎,true必填
    //         example = 舉例說(shuō)明;hidden = 隱藏]
    public void sendMsg() {
        kafkaProducer.send("this is a kafka top message!");

    }

   /* @RequestMapping(value = "/save", method = RequestMethod.POST)
    //@ApiImplicitParam(name = "user", value = "用戶(hù)實(shí)體user", required = true, dataType = "User")
    @ApiOperation(value = "創(chuàng)建用戶(hù)", notes = "創(chuàng)建用戶(hù)")
    public Map<String, Object> saveUser(@ApiParam(required = true, name = "user", value = "用戶(hù)實(shí)體user") @RequestBody @Valid Object user) {return null;}*/
}
6、Swagger
package com.haijia.kafka.common;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import springfox.documentation.spi.DocumentationType;

@Configuration
//注解開(kāi)啟 swagger2 功能,啟動(dòng)后訪問(wèn) http://localhost:8080/swagger-ui.html
@EnableSwagger2
public class Swagger2Config {

    //是否開(kāi)啟swagger胧洒,正式環(huán)境一般是需要關(guān)閉的
    @Value("${swagger.enabled}")
    private boolean enableSwagger;

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                //是否開(kāi)啟 (true 開(kāi)啟  false隱藏畏吓。生產(chǎn)環(huán)境建議隱藏)
                .enable(enableSwagger)
                .select()
                //掃描的路徑包,設(shè)置basePackage會(huì)將包下的所有被@Api標(biāo)記類(lèi)的所有方法作為api
                .apis(RequestHandlerSelectors.basePackage("com.haijia.kafka.controller"))
                //指定路徑處理PathSelectors.any()代表所有的路徑
                .paths(PathSelectors.any())
                .build();
    }

    /**
     *
     * - swagger.title=標(biāo)題
     * - swagger.description=描述
     * - swagger.version=版本
     * - swagger.license=許可證
     * - swagger.licenseUrl=許可證URL
     * - swagger.termsOfServiceUrl=服務(wù)條款URL
     * - swagger.contact.name=維護(hù)人
     * - swagger.contact.url=維護(hù)人URL
     * - swagger.contact.email=維護(hù)人email
     * - swagger.base-package=swagger掃描的基礎(chǔ)包,默認(rèn):全掃描
     * - swagger.base-path=需要處理的基礎(chǔ)URL規(guī)則卫漫,默認(rèn):/**
     * - swagger.exclude-path=需要排除的URL規(guī)則菲饼,默認(rèn):空
     * - swagger.host=文檔的host信息,默認(rèn):空
     */
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                //設(shè)置文檔標(biāo)題(API名稱(chēng))
                .title("SpringBoot中使用Swagger2構(gòu)建RESTful接口")
                //文檔描述
                .description("接口說(shuō)明")
                //服務(wù)條款URL
                .termsOfServiceUrl("http://127.0.0.1:8080/")
                //聯(lián)系信息
                .contact(new Contact("黃海佳","www.baidu.com","9218*413@qq.com"))
                //版本號(hào)
                .version("1.0.0")
                .build();
    }
}

7列赎、請(qǐng)求結(jié)果如下
請(qǐng)求結(jié)果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末宏悦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子包吝,更是在濱河造成了極大的恐慌饼煞,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诗越,死亡現(xiàn)場(chǎng)離奇詭異砖瞧,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)嚷狞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)块促,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人床未,你說(shuō)我怎么就攤上這事竭翠。” “怎么了薇搁?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵斋扰,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)褥实,這世上最難降的妖魔是什么呀狼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮损离,結(jié)果婚禮上哥艇,老公的妹妹穿的比我還像新娘。我一直安慰自己僻澎,他們只是感情好貌踏,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著窟勃,像睡著了一般祖乳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上秉氧,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天眷昆,我揣著相機(jī)與錄音,去河邊找鬼汁咏。 笑死亚斋,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的攘滩。 我是一名探鬼主播帅刊,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼漂问!你這毒婦竟也來(lái)了赖瞒?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蚤假,失蹤者是張志新(化名)和其女友劉穎栏饮,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體勤哗,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡抡爹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了芒划。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡欧穴,死狀恐怖民逼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涮帘,我是刑警寧澤拼苍,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響疮鲫,放射性物質(zhì)發(fā)生泄漏吆你。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一俊犯、第九天 我趴在偏房一處隱蔽的房頂上張望妇多。 院中可真熱鬧,春花似錦燕侠、人聲如沸者祖。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)七问。三九已至,卻和暖如春茫舶,著一層夾襖步出監(jiān)牢的瞬間械巡,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工饶氏, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留坟比,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓嚷往,卻偏偏與公主長(zhǎng)得像葛账,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子皮仁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345