學(xué)習(xí)大綱
一指煎、kafka介紹
Kafka最初是由Linkedln公司采用Scala語(yǔ)言開(kāi)發(fā)的一個(gè)多分區(qū)粗蔚、多副本并且基于ZooKeeper協(xié)調(diào)的分布式消息系統(tǒng)尝偎,現(xiàn)在已經(jīng)捐獻(xiàn)給了Apache基金會(huì)。目前Kafka已經(jīng)定位為一個(gè)分布式流式處理平臺(tái)鹏控,它以高吞吐致扯、可持久化、可水平擴(kuò)展当辐、支持流處理等多種特性而被廣泛應(yīng)用抖僵。
Apache Kafka是一個(gè)分布式的發(fā)布-訂閱消息系統(tǒng),能夠支撐海量數(shù)據(jù)的數(shù)據(jù)傳遞缘揪。在離線和實(shí)時(shí)的消息處理業(yè)務(wù)系統(tǒng)中耍群,Kafka都有廣泛的應(yīng)用义桂。Kafka將消息持久化到磁盤(pán)中,并對(duì)消息創(chuàng)建了備份保證了數(shù)據(jù)的安全蹈垢。Kafka在保證了較高的處理速度的同時(shí)慷吊,又能保證數(shù)據(jù)處理的低延遲和數(shù)據(jù)的零丟失。
1曹抬、特性
(1)高吞吐量溉瓶、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒沐祷,每個(gè)主題可以分多個(gè)分區(qū),消費(fèi)組對(duì)分區(qū)進(jìn)行消費(fèi)操作;
(2)可擴(kuò)展性:kafka集群支持熱擴(kuò)展嚷闭,
(3)持久性攒岛、可靠性:消息被持久化到本地磁盤(pán)赖临,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失;
(4)容錯(cuò)性:允許集群中節(jié)點(diǎn)失斣志狻(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失斁ふァ)
(5)高并發(fā):支持?jǐn)?shù)千個(gè)客戶(hù)端同時(shí)讀寫(xiě);
2、使用場(chǎng)景
(1)日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log顺饮,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開(kāi)放給各種consumer吵聪,例如Hadoop、Hbase兼雄、Solr等;
(2)消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者吟逝、緩存消息等;
(3)用戶(hù)活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶(hù)或者app用戶(hù)的各種活動(dòng),如瀏覽網(wǎng)頁(yè)赦肋、搜索块攒、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中佃乘,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析囱井,或者裝載到Hadoop、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘;
(4)運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)趣避。包括收集各種分布式應(yīng)用的數(shù)據(jù)庞呕,生產(chǎn)各種操作的集中反
饋,比如報(bào)警和報(bào)告﹔
(5)流式處理:比如spark streaming和storm;
3程帕、技術(shù)優(yōu)勢(shì)
- 可伸縮性:Kafka的兩個(gè)重要特性造就了它的可伸縮性住练。
1、Kafka集群在運(yùn)行期間可以輕松地?cái)U(kuò)展或收縮(可以添加或刪除代理)愁拭,而不會(huì)宕機(jī)澎羞。
2、可以擴(kuò)展一個(gè)Kafka主題來(lái)包含更多的分區(qū)敛苇。由于一個(gè)分區(qū)無(wú)法擴(kuò)展到多個(gè)代理妆绞,所以它的容量受到代理磁盤(pán)空間的限制顺呕。能夠增加分區(qū)和代理的數(shù)量意味著單個(gè)主題可以存儲(chǔ)的數(shù)據(jù)量是沒(méi)有限制的。 - 容錯(cuò)性和可靠性:
Kafka的設(shè)計(jì)方式使某個(gè)代理的故障能夠被集群中的其他代理檢測(cè)到括饶。由于每個(gè)主題都可以在多個(gè)代理上復(fù)制株茶,所以集群可以在不中斷服務(wù)的情況下從此類(lèi)故障中恢復(fù)并繼續(xù)運(yùn)行。 - 吞吐量
代理能夠以超快的速度有效地存儲(chǔ)和檢索數(shù)據(jù)图焰。
二启盛、概念詳解
1、Producer
生產(chǎn)者即數(shù)據(jù)的發(fā)布者技羔,該角色將消息發(fā)布到Kafka的topic中僵闯。broker接收到生產(chǎn)者發(fā)送的消息后,broker將該消息追加到當(dāng)前用于追加數(shù)據(jù)的segment文件中藤滥。生產(chǎn)者發(fā)送的消息鳖粟,存儲(chǔ)到一個(gè)partition中,生產(chǎn)者也可以指定數(shù)據(jù)存儲(chǔ)的partition拙绊。
2向图、Consumer
消費(fèi)者可以從broker中讀取數(shù)據(jù)。消費(fèi)者可以消費(fèi)多個(gè)topic中的數(shù)據(jù)标沪。
3榄攀、Topic
在Kafka中,使用一個(gè)類(lèi)別屬性來(lái)劃分?jǐn)?shù)據(jù)的所屬類(lèi)金句,劃分?jǐn)?shù)據(jù)的這個(gè)類(lèi)稱(chēng)為topic檩赢。如果把Kafka看做為一個(gè)數(shù)據(jù)庫(kù),topic可以理解為數(shù)據(jù)庫(kù)中的一張表违寞,topic的名字即為表名贞瞒。
4、Partition
topic中的數(shù)據(jù)分割為一個(gè)或多個(gè)partition坞靶。每個(gè)topic至少有一partition憔狞。每個(gè)partition中的數(shù)據(jù)使用多個(gè)segment文件存儲(chǔ)。partition中的數(shù)據(jù)是有序的彰阴,partition間的數(shù)據(jù)丟失了數(shù)據(jù)的順序瘾敢。如果topic有多個(gè)partition,消費(fèi)數(shù)據(jù)時(shí)就不能保證數(shù)據(jù)的順序尿这。在需要嚴(yán)格保證消息的消費(fèi)順序的場(chǎng)景下簇抵,需要將partition數(shù)目設(shè)為1。
5射众、Partition offset
每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset碟摆,它指明了這條消息的起始位置。
6叨橱、Replicas of partition
副本是一個(gè)分區(qū)的備份典蜕。副本不會(huì)被消費(fèi)者消費(fèi)断盛,副本只用于防止數(shù)據(jù)丟失,即消費(fèi)者不從為follower的partition中消費(fèi)數(shù)據(jù)愉舔,而是從為leader的partition中讀取數(shù)據(jù)钢猛。副本之間是一主多從的關(guān)系。
7轩缤、Broker
Kafka集群包含一個(gè)或多個(gè)服務(wù)器命迈,服務(wù)器節(jié)點(diǎn)稱(chēng)為broker。broker存儲(chǔ)topic的數(shù)據(jù)火的。如果某topic有N個(gè)partition壶愤,集群有N個(gè)broker,那么每個(gè)broker存儲(chǔ)該topic的一個(gè)partition馏鹤。如果某topic有N個(gè)partition征椒,集群有(N+M)個(gè)broker,那么其中有N個(gè)broker存儲(chǔ)該topic的一個(gè)partition假瞬,剩下的M個(gè)broker不存儲(chǔ)該topic的partition數(shù)據(jù)陕靠。如果某topic有N個(gè)partition迂尝,集群中broker數(shù)目少于N個(gè)脱茉,那么一個(gè)broker存儲(chǔ)該topic的一個(gè)或多個(gè)partition。在實(shí)際生產(chǎn)環(huán)境中垄开,盡量避免這種情況的發(fā)生琴许,這種情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。
8溉躲、Leader
每個(gè)partition有多個(gè)副本榜田,其中有且僅有一個(gè)作為L(zhǎng)eader,Leader是當(dāng)前負(fù)責(zé)數(shù)據(jù)的讀寫(xiě)的partition锻梳。
9箭券、Follower
Follower跟隨Leader,所有寫(xiě)請(qǐng)求都通過(guò)Leader路由疑枯,數(shù)據(jù)變更會(huì)廣播給所有Follower辩块,F(xiàn)ollower與Leader保持?jǐn)?shù)據(jù)同步。如果Leader失效荆永,則從Follower中選舉出一個(gè)新的Leader废亭。當(dāng)Follower與Leader掛掉、卡住或者同步太慢具钥,leader會(huì)把這個(gè)follower從"in sync replicas”(ISR)列表中刪除豆村,重新創(chuàng)建一個(gè)Follower。
10骂删、zookeeper
Zookeeper負(fù)責(zé)維護(hù)和協(xié)調(diào)broker掌动。當(dāng)Kafka系統(tǒng)中新增了broker或者某個(gè)broker發(fā)生故障失效時(shí)四啰,由ZooKeeper通知生產(chǎn)者和消費(fèi)者。生產(chǎn)者和消費(fèi)者依據(jù)Zookeeper的broker狀態(tài)信息與broker協(xié)調(diào)數(shù)據(jù)的發(fā)布和訂閱任務(wù)粗恢。
11拟逮、AR(Assigned Replicas)
分區(qū)中所有的副本統(tǒng)稱(chēng)為AR。
12适滓、ISR(In-Sync Replicas)
所有與Leader部分保持一定程度的副(包括Leader副本在內(nèi))本組成ISR敦迄。
13、OSR(Out-of-Sync-Replicas)
與Leader副本同步滯后過(guò)多的副本凭迹。
14罚屋、HW(High Watermark)
高水位,標(biāo)識(shí)了一個(gè)特定的offset嗅绸,消費(fèi)者只能拉取到這個(gè)offset之前的消息脾猛。
15、LEO(Log End Offset)
即日志末端位移(log end offset)鱼鸠,記錄了該副本底層日志(log)中下一條消息的位移值猛拴。注意是下一條消息!也就是說(shuō),如果LEO=10蚀狰,那么表示該副本保存了10條消息愉昆,位移值范圍是[0,9]麻蹋。
三跛溉、安裝與配置
1、使用Kafka需求有以下前置條件
- Linux系統(tǒng)/windows系統(tǒng)(博主選的是windows)
- java環(huán)境
- zookeeper
zookeeper下載地址
注意著兩個(gè)文件都要下載扮授,并把a(bǔ)pache-zookeeper-3.5.8-bin.tar包里面的lib放到apache-zookeeper-3.5.8.tar.gz里面芳室,不然在運(yùn)行zookeeper的時(shí)候如果閃退。如果閃退可以在配置文件上寫(xiě)上pause刹勃,這樣方便查看錯(cuò)誤
錯(cuò)誤: 找不到或無(wú)法加載主類(lèi) org.apache.zookeeper.server.quorum.QuorumPeerMain
- Kafka環(huán)境安裝
Kafka下載地址
將下載文件解壓后運(yùn)行如下命令,即可啟動(dòng)成功
.\bin\windows\kafka-server-start.bat .\config\server.properties
2堪侯、Kafka測(cè)試消息生產(chǎn)與消費(fèi)
- 創(chuàng)建主題
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic haijia--partitions 2--replication-factor 1
--zookeeper:指定了Kafka所連接的Zookeeper服務(wù)地址
--create:創(chuàng)建主題的動(dòng)作指令
--topic:指定了所要?jiǎng)?chuàng)建主題的名稱(chēng)
--partitions:指定了分區(qū)個(gè)數(shù)
--replication-factor:指定了副本因子
- 展示所有的主題
.bin/kafka-topics.sh --zookeeper localhost:2181 --list
- 查看主題詳情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic haijia
- 刪除主題
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic haijia
需要 server.properties 中設(shè)置 delete.topic.enable=true 否則只是標(biāo)記刪除。
若delete.topic.enable=true荔仁,直接徹底刪除該Topic伍宦。
若delete.topic.enable=false ,如果當(dāng)前Topic沒(méi)有使用過(guò)即沒(méi)有傳輸過(guò)信息:可以徹底刪除咕晋。如果當(dāng)前Topic有使用過(guò)即有過(guò)傳輸過(guò)信息雹拄,并沒(méi)有真正刪除Topic只是把這個(gè)Topic標(biāo)記為刪除(marked for deletion),重啟Kafka Server后刪除掌呜。
- 增加分區(qū)
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic haijia --partitions 6
修改分區(qū)數(shù)時(shí)滓玖,僅能增加分區(qū)個(gè)數(shù)。若是用其減少 partition個(gè)數(shù)质蕉,則會(huì)報(bào)錯(cuò)誤信息
- 啟動(dòng)消費(fèi)段接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic haijia
--bootstrap-server 指定了連接Kafka集群的地址
-topic 指定了消費(fèi)端訂閱的主題
- 生產(chǎn)端發(fā)送消息
bin/kafka-console-pfoducer.sh --broker-list localhost:9092--topic haijia
--broker-list 指定了連接的Kafka集群的地址
--topic指定了發(fā)送消息時(shí)的主題
四势篡、Java第一個(gè)程序
1翩肌、快速啟動(dòng)環(huán)境
D:\Software\apacheZookeeper3.5.8\apache-zookeeper-3.5.8\bin>zkServer.cmd
D:\Software\kafka_2.13-2.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties
1、新建項(xiàng)目
也可以在pom.xml引入依賴(lài)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.haijia</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.0.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
<configuration>
<mainClass>com.haijia.kafka.KafkaApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2禁悠、application.yml中引入kafka相關(guān)配置
spring:
kafka:
#如果是集群念祭,用 ,分隔碍侦,如:112.126.74.249:9092,112.126.74.249:9093,172.101.203.33:9092
bootstrap-servers: 127.0.0.1:9092
producer:
# 發(fā)生錯(cuò)誤后粱坤,消息重發(fā)的次數(shù)。
retries: 0
#當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí)瓷产,生產(chǎn)者會(huì)把它們放在同一個(gè)批次里站玄。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算濒旦。
batch-size: 16384
# 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小株旷。
buffer-memory: 33554432
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 應(yīng)答級(jí)別:
# acks=0 : 生產(chǎn)者在成功寫(xiě)入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)。
# acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息尔邓,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器成功響應(yīng)晾剖。
# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)梯嗽。
acks: 1
#properties:
# 自定義分區(qū)器
# partitioner-class: com.felix.kafka.producer.CustomizePartitioner
# 提交延時(shí):當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會(huì)將消息提交給kafka,
# linger.ms為0表示每接收到一條消息就提交給kafka,這時(shí)候batch-size其實(shí)就沒(méi)用了
# linger-ms: 0
consumer:
# 自動(dòng)提交的時(shí)間間隔 在spring boot 2.X 版本中這里采用的是值的類(lèi)型為Duration 需要符合特定的格式齿尽,如1S,1M,2H,5D
auto-commit-interval: 1S
# spring.kafka.consumer.enable-auto-commit=true->是否自動(dòng)提交offset
# spring.kafka.consumer.auto.commit.interval.ms=1000->提交offset延時(shí)(接收到消息后多久提交offset)
# spring.kafka.consumer.properties.session.timeout.ms=120000->消費(fèi)會(huì)話超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間consumer沒(méi)有發(fā)送心跳,就會(huì)觸發(fā)rebalance操作)
# spring.kafka.consumer.properties.request.timeout.ms=18000->消費(fèi)請(qǐng)求超時(shí)時(shí)間
# 該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下該作何處理:
# latest(默認(rèn)值)在偏移量無(wú)效的情況下,消費(fèi)者將從最新的記錄開(kāi)始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
# earliest :在偏移量無(wú)效的情況下慷荔,消費(fèi)者將從起始位置讀取分區(qū)的記錄
auto-offset-reset: earliest
# 是否自動(dòng)提交偏移量雕什,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失缠俺,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在偵聽(tīng)器容器中運(yùn)行的線程數(shù)显晶。
concurrency: 5
#listner負(fù)責(zé)ack,每調(diào)用一次壹士,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
swagger:
enabled: true
3磷雇、producer
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Slf4j
public class KafkaProducer {
//自定義topic
public static final String TOPIC_TEST = "topic.test";
public static final String TOPIC_GROUP1 = "topic.group1";
public static final String TOPIC_GROUP2 = "topic.group2";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void send(Object obj) {
String obj2String = JSONObject.toJSONString(obj);
log.info("準(zhǔn)備發(fā)送消息為:{}", obj2String);
//發(fā)送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//發(fā)送失敗的處理
log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的處理
log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
4躏救、Consumer
package com.haijia.kafka.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(KafkaProducer.TOPIC_GROUP1+"KafkaConsumer 接收到消息");
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消費(fèi)了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(KafkaProducer.TOPIC_GROUP2+"KafkaConsumer 接收到消息");
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消費(fèi)了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
}
5唯笙、Controller
import com.haijia.kafka.kafka.KafkaProducer;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
@RequestMapping("/kafka")
@Controller
@Slf4j
@Api(value = "SwaggerValue", tags = "KafkaController", description = "測(cè)試接口相關(guān)")//,produces = Media
//Api【tags=“說(shuō)明該類(lèi)的作用,可以在UI界面上看到的注解”】
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping(value = "/send")
@ResponseBody
@Transactional(rollbackFor = Exception.class)
@ApiOperation(value = "kafka發(fā)送消息", notes = "發(fā)送消息")
//ApiOperation【value=“說(shuō)明方法的用途盒使、作用”;notes=“方法的備注說(shuō)明】
@ApiImplicitParam(name = "id", value = "用戶(hù)ID", required = false, dataType = "Long", paramType = "query")
//ApiImplicitParam【name:參數(shù)名崩掘;value:參數(shù)的漢字說(shuō)明、解釋?zhuān)籨ataType: 參數(shù)類(lèi)型少办,默認(rèn)String苞慢;required : 參數(shù)是否必傳,true必傳
// defaultValue:參數(shù)的默認(rèn)值英妓;paramType:參數(shù)放在哪個(gè)地方挽放;header請(qǐng)求參數(shù)的獲取:@RequestHeader,參數(shù)從請(qǐng)求頭獲取
// query請(qǐng)求參數(shù)的獲取:@RequestParam,參數(shù)從地址欄問(wèn)號(hào)后面的參數(shù)獲壬苋;path(用于restful接口)請(qǐng)求參數(shù)的獲燃琛:@PathVariable吗蚌,參數(shù)從URL地址上獲取
// body(不常用)參數(shù)從請(qǐng)求體中獲取纯出;form(不常用)參數(shù)從form表單中獲取】
//@ApiIgnore
//ApiIgnore: 使用該注解忽略這個(gè)API蚯妇,不會(huì)生成接口文檔≡蒹荩可注解在類(lèi)和方法上
//@ApiResponse(code = 400,message = "參數(shù)錯(cuò)誤",response = "拋出異常的類(lèi)")
//ApiResponse:響應(yīng)介紹
//@ApiResponses(一組響應(yīng))
//@ApiModel(用在返回對(duì)象類(lèi)上侮措,描述一個(gè)Model的信息(這種一般用在post創(chuàng)建的時(shí)候,使用@RequestBody這樣的場(chǎng)景乖杠,請(qǐng)求參數(shù)無(wú)法使用@ApiImplicitParam注解進(jìn)行描述的時(shí)候)
//@ApiModelProperty[value = 字段說(shuō)明,name = 重寫(xiě)屬性名字;dataType = 重寫(xiě)屬性類(lèi)型;required = 是否必填分扎,true必填
// example = 舉例說(shuō)明;hidden = 隱藏]
public void sendMsg() {
kafkaProducer.send("this is a kafka top message!");
}
/* @RequestMapping(value = "/save", method = RequestMethod.POST)
//@ApiImplicitParam(name = "user", value = "用戶(hù)實(shí)體user", required = true, dataType = "User")
@ApiOperation(value = "創(chuàng)建用戶(hù)", notes = "創(chuàng)建用戶(hù)")
public Map<String, Object> saveUser(@ApiParam(required = true, name = "user", value = "用戶(hù)實(shí)體user") @RequestBody @Valid Object user) {return null;}*/
}
6、Swagger
package com.haijia.kafka.common;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import springfox.documentation.spi.DocumentationType;
@Configuration
//注解開(kāi)啟 swagger2 功能,啟動(dòng)后訪問(wèn) http://localhost:8080/swagger-ui.html
@EnableSwagger2
public class Swagger2Config {
//是否開(kāi)啟swagger胧洒,正式環(huán)境一般是需要關(guān)閉的
@Value("${swagger.enabled}")
private boolean enableSwagger;
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
//是否開(kāi)啟 (true 開(kāi)啟 false隱藏畏吓。生產(chǎn)環(huán)境建議隱藏)
.enable(enableSwagger)
.select()
//掃描的路徑包,設(shè)置basePackage會(huì)將包下的所有被@Api標(biāo)記類(lèi)的所有方法作為api
.apis(RequestHandlerSelectors.basePackage("com.haijia.kafka.controller"))
//指定路徑處理PathSelectors.any()代表所有的路徑
.paths(PathSelectors.any())
.build();
}
/**
*
* - swagger.title=標(biāo)題
* - swagger.description=描述
* - swagger.version=版本
* - swagger.license=許可證
* - swagger.licenseUrl=許可證URL
* - swagger.termsOfServiceUrl=服務(wù)條款URL
* - swagger.contact.name=維護(hù)人
* - swagger.contact.url=維護(hù)人URL
* - swagger.contact.email=維護(hù)人email
* - swagger.base-package=swagger掃描的基礎(chǔ)包,默認(rèn):全掃描
* - swagger.base-path=需要處理的基礎(chǔ)URL規(guī)則卫漫,默認(rèn):/**
* - swagger.exclude-path=需要排除的URL規(guī)則菲饼,默認(rèn):空
* - swagger.host=文檔的host信息,默認(rèn):空
*/
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
//設(shè)置文檔標(biāo)題(API名稱(chēng))
.title("SpringBoot中使用Swagger2構(gòu)建RESTful接口")
//文檔描述
.description("接口說(shuō)明")
//服務(wù)條款URL
.termsOfServiceUrl("http://127.0.0.1:8080/")
//聯(lián)系信息
.contact(new Contact("黃海佳","www.baidu.com","9218*413@qq.com"))
//版本號(hào)
.version("1.0.0")
.build();
}
}